Performance and stability improvements to streaming (#1156)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 21 Mar 2024 00:49:27 +0000 (01:49 +0100)
committerGitHub <noreply@github.com>
Thu, 21 Mar 2024 00:49:27 +0000 (01:49 +0100)
24 files changed:
music_assistant/__main__.py
music_assistant/client/client.py
music_assistant/server/controllers/streams.py
music_assistant/server/controllers/webserver.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/auth.py
music_assistant/server/helpers/process.py
music_assistant/server/helpers/tags.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64
music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64
music_assistant/server/providers/airplay/bin/cliraop-macos-arm64
music_assistant/server/providers/chromecast/__init__.py
music_assistant/server/providers/dlna/__init__.py
music_assistant/server/providers/fully_kiosk/__init__.py
music_assistant/server/providers/hass_players/__init__.py
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/snapcast/manifest.json
music_assistant/server/providers/sonos/__init__.py
music_assistant/server/providers/spotify/__init__.py
music_assistant/server/providers/ugp/__init__.py
music_assistant/server/server.py
requirements_all.txt

index 7d1cf1f1ab1cf8d490f3213a51bc83a3adaef379..712adc3f2d2ddfec394bc0f5bb2da0acfc2064f2 100644 (file)
@@ -29,14 +29,6 @@ MAX_LOG_FILESIZE = 1000000 * 10  # 10 MB
 ALPINE_RELEASE_FILE = "/etc/alpine-release"
 
 
-class VerboseLogger(logging.Logger):
-    """Custom python logger with included verbose log level."""
-
-    def verbose(self, msg, *args, **kwargs):
-        """Log a verbose message."""
-        self.log(VERBOSE_LOG_LEVEL, msg, *args, **kwargs)
-
-
 def get_arguments():
     """Arguments handling."""
     parser = argparse.ArgumentParser(description="MusicAssistant")
@@ -103,7 +95,6 @@ def setup_logger(data_path: str, level: str = "DEBUG"):
     logger = logging.getLogger()
     logger.addHandler(file_handler)
     logging.addLevelName(VERBOSE_LOG_LEVEL, "VERBOSE")
-    logging.setLoggerClass(VerboseLogger)
 
     # apply the configured global log level to the (root) music assistant logger
     logging.getLogger(ROOT_LOGGER_NAME).setLevel(level)
index d3daa74da1f4d50f91a87c2e68299e601e4ad85c..d8e70e319b8049d50b5257834f7f7ee2f9b42d8c 100644 (file)
@@ -9,11 +9,7 @@ import uuid
 from collections.abc import Callable
 from typing import TYPE_CHECKING, Any
 
-from music_assistant.client.exceptions import (
-    ConnectionClosed,
-    InvalidServerVersion,
-    InvalidState,
-)
+from music_assistant.client.exceptions import ConnectionClosed, InvalidServerVersion, InvalidState
 from music_assistant.common.models.api import (
     ChunkedResultMessage,
     CommandMessage,
@@ -299,9 +295,12 @@ class MusicAssistantClient:
         return self
 
     async def __aexit__(
-        self, exc_type: Exception, exc_value: str, traceback: TracebackType
-    ) -> None:
-        """Disconnect from the server and exit."""
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> bool | None:
+        """Exit context manager."""
         await self.disconnect()
 
     def __repr__(self) -> str:
index f0f706ff184b96788b7229153d0a5db3a0c3c858..1b9e2fd9acf2021dc4e0e7f3c557c2b06b5742ad 100644 (file)
@@ -13,17 +13,13 @@ import logging
 import time
 import urllib.parse
 from collections.abc import AsyncGenerator
+from contextlib import asynccontextmanager
 from typing import TYPE_CHECKING
 
 import shortuuid
 from aiohttp import web
 
-from music_assistant.common.helpers.util import (
-    empty_queue,
-    get_ip,
-    select_free_port,
-    try_parse_bool,
-)
+from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
 from music_assistant.common.models.config_entries import (
     ConfigEntry,
     ConfigValueOption,
@@ -41,15 +37,18 @@ from music_assistant.constants import (
     CONF_OUTPUT_CHANNELS,
     CONF_PUBLISH_IP,
     SILENCE_FILE,
+    VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
+    get_ffmpeg_args,
     get_ffmpeg_stream,
     get_media_stream,
     get_player_filter_params,
 )
+from music_assistant.server.helpers.process import AsyncProcess
 from music_assistant.server.helpers.util import get_ips
 from music_assistant.server.helpers.webserver import Webserver
 from music_assistant.server.models.core_controller import CoreController
@@ -65,28 +64,30 @@ if TYPE_CHECKING:
 DEFAULT_STREAM_HEADERS = {
     "transferMode.dlna.org": "Streaming",
     "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000",  # noqa: E501
-    "Cache-Control": "no-cache",
+    "Cache-Control": "no-cache,must-revalidate",
+    "Pragma": "no-cache",
     "Connection": "close",
     "Accept-Ranges": "none",
-    "icy-name": "Music Assistant",
-    "icy-pub": "0",
+    "Icy-Name": "Music Assistant",
+    "Icy-Url": "https://music-assistant.io",
 }
-FLOW_MAX_SAMPLE_RATE = 96000
-FLOW_MAX_BIT_DEPTH = 24
+FLOW_DEFAULT_SAMPLE_RATE = 48000
+FLOW_DEFAULT_BIT_DEPTH = 24
 
 # pylint:disable=too-many-locals
 
 
-class MultiClientQueueStreamJob:
-    """Representation of a (multiclient) Audio Queue stream job/task.
+class QueueStreamJob:
+    """
+    Representation of a (multiclient) Audio stream job/task.
 
-    The whole idea here is that the queue stream audio can be sent to multiple
+    The whole idea here is that the (pcm) audio source can be sent to multiple
     players at once. For example for (slimproto/airplay) syncgroups and universal group.
-    all client players receive the exact same audio chunks from the source audio,
-    encoded and/or resampled to the player's preferences.
-    A StreamJob is tied to a Queue and streams the queue flow stream,
+
+    All client players receive the exact same audio chunks from the source audio,
+    then encoded and/or resampled to the player's preferences.
     In case a stream is restarted (e.g. when seeking),
-    a new MultiClientQueueStreamJob will be created.
+    a new QueueStreamJob will be created.
     """
 
     _audio_task: asyncio.Task | None = None
@@ -96,21 +97,23 @@ class MultiClientQueueStreamJob:
         mass: MusicAssistant,
         pcm_audio_source: AsyncGenerator[bytes, None],
         pcm_format: AudioFormat,
-        expected_players: set[str],
+        auto_start: bool = False,
     ) -> None:
-        """Initialize MultiClientQueueStreamJob instance."""
+        """Initialize QueueStreamJob instance."""
         self.mass = mass
         self.pcm_audio_source = pcm_audio_source
         self.pcm_format = pcm_format
-        self.expected_players = expected_players
+        self.expected_players: set[str] = set()
         self.job_id = shortuuid.uuid()
         self.bytes_streamed: int = 0
         self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}")
-        self._subscribed_players: dict[str, asyncio.Queue] = {}
+        self._subscribed_players: dict[str, AsyncProcess] = {}
         self._finished = False
         self._running = False
-        self._allow_start = asyncio.Event()
+        self.allow_start = asyncio.Event()
         self._audio_task = asyncio.create_task(self._stream_job_runner())
+        if auto_start:
+            self.allow_start.set()
 
     @property
     def finished(self) -> bool:
@@ -120,7 +123,7 @@ class MultiClientQueueStreamJob:
     @property
     def pending(self) -> bool:
         """Return if this Job is pending start."""
-        return not self.finished and not self._audio_task
+        return not self.finished and not self.running
 
     @property
     def running(self) -> bool:
@@ -131,84 +134,122 @@ class MultiClientQueueStreamJob:
         """Start running (send audio chunks to connected players)."""
         if self.finished:
             raise RuntimeError("Task is already finished")
-        self._allow_start.set()
+        self.allow_start.set()
 
     def stop(self) -> None:
         """Stop running this job."""
-        if self._audio_task and self._audio_task.done():
-            return
-        if self._audio_task:
+        if self._audio_task and not self._audio_task.done():
             self._audio_task.cancel()
         self._finished = True
 
-    def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
+    def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str:
         """Resolve the childplayer specific stream URL to this streamjob."""
         fmt = output_codec.value
         # handle raw pcm
         if output_codec.is_pcm():
-            player = self.mass.streams.mass.players.get(child_player_id)
+            player = self.mass.streams.mass.players.get(player_id)
             player_max_bit_depth = 24 if player.supports_24bit else 16
             output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
             output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
             output_channels = self.mass.config.get_raw_player_config_value(
-                child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
+                player_id, CONF_OUTPUT_CHANNELS, "stereo"
             )
             channels = 1 if output_channels != "stereo" else 2
             fmt += (
                 f";codec=pcm;rate={output_sample_rate};"
                 f"bitrate={output_bit_depth};channels={channels}"
             )
-        url = f"{self.mass.streams._server.base_url}/multi/{self.job_id}/{child_player_id}.{fmt}"
-        self.expected_players.add(child_player_id)
+        url = f"{self.mass.streams._server.base_url}/flow/{self.job_id}/{player_id}.{fmt}"
+        self.expected_players.add(player_id)
         return url
 
-    async def subscribe(
+    async def iter_player_audio(
         self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None
     ) -> AsyncGenerator[bytes, None]:
-        """Subscribe consumer and iterate chunks on the queue encoded to given output format."""
-        async for chunk in get_ffmpeg_stream(
-            audio_input=self._subscribe_pcm(player_id),
+        """Subscribe consumer and iterate player-specific audio."""
+        ffmpeg_args = get_ffmpeg_args(
             input_format=self.pcm_format,
             output_format=output_format,
             filter_params=get_player_filter_params(self.mass, player_id),
-            chunk_size=chunk_size,
-        ):
-            yield chunk
+            extra_args=[],
+            input_path="-",
+            output_path="-",
+            loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
+        )
+        # launch ffmpeg process with player specific settings
+        # the stream_job_runner will start pushing pcm chunks to the stdin
+        # we then read the players-specific (encoded) output chunks
+        # from ffmpeg stdout and yield them
+        async with AsyncProcess(
+            ffmpeg_args, enable_stdin=True, enable_stdout=True, enable_stderr=False
+        ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc):
+            # read final chunks from ffmpeg's stdout
+            iterator = (
+                ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
+            )
+            async for chunk in iterator:
+                try:
+                    yield chunk
+                except (BrokenPipeError, ConnectionResetError):
+                    # race condition?
+                    break
 
-    async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]:
-        """Subscribe consumer and iterate incoming (raw pcm) chunks on the queue."""
+    async def stream_to_custom_output_path(
+        self, player_id: str, output_format: AudioFormat, output_path: str
+    ) -> None:
+        """Subscribe consumer and instruct ffmpeg to send the audio to the given output path."""
+        ffmpeg_args = get_ffmpeg_args(
+            input_format=self.pcm_format,
+            output_format=output_format,
+            filter_params=get_player_filter_params(self.mass, player_id),
+            extra_args=[],
+            input_path="-",
+            output_path=output_path,
+            loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
+        )
+        # launch ffmpeg process with player specific settings
+        # the stream_job_runner will start pushing pcm chunks to the stdin
+        # the ffmpeg process will send the output directly to the given path (e.g. tcp socket)
+        async with AsyncProcess(
+            ffmpeg_args,
+            enable_stdin=True,
+            enable_stdout=False,
+            enable_stderr=False,
+        ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc):
+            # we simply wait for the process to exit
+            await ffmpeg_proc.wait()
+
+    @asynccontextmanager
+    async def subscribe(
+        self, player_id: str, ffmpeg_proc: AsyncProcess
+    ) -> AsyncGenerator[QueueStreamJob]:
+        """Subscribe consumer's (output) ffmpeg process."""
+        if self.running:
+            # client subscribes while we're already started
+            # that will probably cause side effects but let it go
+            self.logger.warning(
+                "Player %s is joining while the stream is already started!", player_id
+            )
         try:
-            self._subscribed_players[player_id] = queue = asyncio.Queue(2)
-
-            if self.running:
-                # client subscribes while we're already started
-                # that will probably cause side effects but let it go
-                self.logger.warning(
-                    "Player %s is joining while the stream is already started!", player_id
-                )
-            else:
-                self.logger.debug("Subscribed player %s", player_id)
-
-            # yield from queue until finished
-            while not self._finished:
-                yield await queue.get()
+            self._subscribed_players[player_id] = ffmpeg_proc
+            self.logger.debug("Subscribed player %s", player_id)
+            yield self
         finally:
-            if sub_queue := self._subscribed_players.pop(player_id, None):
-                empty_queue(sub_queue)
+            self._subscribed_players.pop(player_id, None)
             self.logger.debug("Unsubscribed client %s", player_id)
             # check if this was the last subscriber and we should cancel
-            await asyncio.sleep(2)
+            await asyncio.sleep(5)
             if len(self._subscribed_players) == 0 and not self.finished:
                 self.logger.debug("Cleaning up, all clients disappeared...")
                 self.stop()
 
     async def _stream_job_runner(self) -> None:
         """Feed audio chunks to StreamJob subscribers."""
-        await self._allow_start.wait()
+        await self.allow_start.wait()
         retries = 50
         while retries:
             retries -= 1
-            await asyncio.sleep(0.2)
+            await asyncio.sleep(0.1)
             if len(self._subscribed_players) != len(self.expected_players):
                 continue
             await asyncio.sleep(0.2)
@@ -217,15 +258,24 @@ class MultiClientQueueStreamJob:
             break
 
         self.logger.debug(
-            "Starting multi client stream job %s with %s out of %s connected clients",
+            "Starting stream job %s with %s out of %s connected clients",
             self.job_id,
             len(self._subscribed_players),
             len(self.expected_players),
         )
         async for chunk in self.pcm_audio_source:
+            num_subscribers = len(self._subscribed_players)
+            if num_subscribers == 0:
+                break
             async with asyncio.TaskGroup() as tg:
-                for listener_queue in list(self._subscribed_players.values()):
-                    tg.create_task(listener_queue.put(chunk))
+                for ffmpeg_proc in list(self._subscribed_players.values()):
+                    tg.create_task(ffmpeg_proc.write(chunk))
+
+        # write EOF at end of queue stream
+        async with asyncio.TaskGroup() as tg:
+            for ffmpeg_proc in list(self._subscribed_players.values()):
+                tg.create_task(ffmpeg_proc.write_eof())
+        self.logger.debug("Finished stream job %s", self.job_id)
         self._finished = True
 
 
@@ -249,12 +299,12 @@ class StreamsController(CoreController):
         """Initialize instance."""
         super().__init__(*args, **kwargs)
         self._server = Webserver(self.logger, enable_dynamic_routes=True)
-        self.multi_client_jobs: dict[str, MultiClientQueueStreamJob] = {}
+        self.stream_jobs: dict[str, QueueStreamJob] = {}
         self.register_dynamic_route = self._server.register_dynamic_route
         self.unregister_dynamic_route = self._server.unregister_dynamic_route
         self.manifest.name = "Streamserver"
         self.manifest.description = (
-            "Music Assistant's core server that is responsible for "
+            "Music Assistant's core controller that is responsible for "
             "streaming audio to players on the local network as well as "
             "some player specific local control callbacks."
         )
@@ -339,12 +389,7 @@ class StreamsController(CoreController):
             static_routes=[
                 (
                     "*",
-                    "/multi/{job_id}/{player_id}.{fmt}",
-                    self.serve_multi_subscriber_stream,
-                ),
-                (
-                    "*",
-                    "/flow/{queue_id}/{queue_item_id}.{fmt}",
+                    "/flow/{job_id}/{player_id}.{fmt}",
                     self.serve_queue_flow_stream,
                 ),
                 (
@@ -369,7 +414,7 @@ class StreamsController(CoreController):
         """Cleanup on exit."""
         await self._server.close()
 
-    async def resolve_stream_url(
+    def resolve_stream_url(
         self,
         player_id: str,
         queue_item: QueueItem,
@@ -381,16 +426,35 @@ class StreamsController(CoreController):
         # handle announcement item
         if queue_item.media_type == MediaType.ANNOUNCEMENT:
             return queue_item.queue_item_id
-        # handle request for multi client queue stream
-        stream_job = self.multi_client_jobs.get(queue_item.queue_id)
-        if queue_item.queue_item_id == "flow" or stream_job and stream_job.pending:
+        # handle request for (multi client) queue flow stream
+        if queue_item.queue_item_id in ("flow", queue_item.queue_id) or flow_mode:
+            # note: this will return an existing streamjonb if that was already created
+            # e.g. in case of universal group player
+            pcm_format = AudioFormat(
+                content_type=ContentType.from_bit_depth(24),
+                sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
+                bit_depth=FLOW_DEFAULT_BIT_DEPTH,
+            )
+            stream_job = self.create_stream_job(
+                queue_item.queue_id,
+                pcm_audio_source=self.get_flow_stream(
+                    self.mass.player_queues.get(queue_item.queue_id),
+                    start_queue_item=queue_item,
+                    pcm_format=pcm_format,
+                ),
+                pcm_format=pcm_format,
+                auto_start=True,
+            )
+
             return stream_job.resolve_stream_url(player_id, output_codec)
+
         # handle raw pcm without exact format specifiers
         if output_codec.is_pcm() and ";" not in fmt:
             fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
         query_params = {}
-        base_path = "flow" if flow_mode else "single"
-        url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}"  # noqa: E501
+        url = (
+            f"{self._server.base_url}/single/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}"
+        )
         # we add a timestamp as basic checksum
         # most importantly this is to invalidate any caches
         # but also to handle edge cases such as single track repeat
@@ -398,41 +462,29 @@ class StreamsController(CoreController):
         url += "?" + urllib.parse.urlencode(query_params)
         return url
 
-    async def create_multi_client_stream_job(
+    def create_stream_job(
         self,
         queue_id: str,
-        start_queue_item: QueueItem,
-        pcm_bit_depth: int = 24,
-        pcm_sample_rate: int = 48000,
-        expected_players: set[str] | None = None,
-    ) -> MultiClientQueueStreamJob:
+        pcm_audio_source: AsyncGenerator[bytes, None],
+        pcm_format: AudioFormat,
+        auto_start: bool = False,
+    ) -> QueueStreamJob:
         """
-        Create a MultiClientQueueStreamJob for the given queue..
+        Create a QueueStreamJob for the given queue..
 
         This is called by player/sync group implementations to start streaming
         the queue audio to multiple players at once.
         """
-        if existing_job := self.multi_client_jobs.get(queue_id, None):
+        if existing_job := self.stream_jobs.get(queue_id, None):
             if existing_job.pending:
                 return existing_job
             # cleanup existing job first
             existing_job.stop()
-        queue = self.mass.player_queues.get(queue_id)
-        pcm_format = AudioFormat(
-            content_type=ContentType.from_bit_depth(pcm_bit_depth),
-            sample_rate=pcm_sample_rate,
-            bit_depth=pcm_bit_depth,
-            channels=2,
-        )
-        self.multi_client_jobs[queue_id] = stream_job = MultiClientQueueStreamJob(
+        self.stream_jobs[queue_id] = stream_job = QueueStreamJob(
             self.mass,
-            pcm_audio_source=self.get_flow_stream(
-                queue=queue,
-                start_queue_item=start_queue_item,
-                pcm_format=pcm_format,
-            ),
+            pcm_audio_source=pcm_audio_source,
             pcm_format=pcm_format,
-            expected_players=expected_players or set(),
+            auto_start=auto_start,
         )
         return stream_job
 
@@ -507,99 +559,8 @@ class StreamsController(CoreController):
     async def serve_queue_flow_stream(self, request: web.Request) -> web.Response:
         """Stream Queue Flow audio to player."""
         self._log_request(request)
-        queue_id = request.match_info["queue_id"]
-        if not (queue := self.mass.player_queues.get(queue_id)):
-            raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
-        start_queue_item_id = request.match_info["queue_item_id"]
-        start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
-        if not start_queue_item:
-            raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
-        queue_player = self.mass.players.get(queue_id)
-        # work out output format/details
-        output_format = await self._get_output_format(
-            output_format_str=request.match_info["fmt"],
-            queue_player=queue_player,
-            default_sample_rate=FLOW_MAX_SAMPLE_RATE,
-            default_bit_depth=FLOW_MAX_BIT_DEPTH,
-        )
-        # prepare request, add some DLNA/UPNP compatible headers
-        enable_icy = request.headers.get("Icy-MetaData", "") == "1"
-        icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384
-        headers = {
-            **DEFAULT_STREAM_HEADERS,
-            "Content-Type": f"audio/{output_format.output_format_str}",
-        }
-        if enable_icy:
-            headers["icy-metaint"] = str(icy_meta_interval)
-
-        resp = web.StreamResponse(
-            status=200,
-            reason="OK",
-            headers=headers,
-        )
-        await resp.prepare(request)
-
-        # return early if this is not a GET request
-        if request.method != "GET":
-            return resp
-
-        # all checks passed, start streaming!
-        self.logger.debug("Start serving Queue flow audio stream for %s", queue_player.name)
-
-        pcm_format = AudioFormat(
-            content_type=ContentType.from_bit_depth(output_format.bit_depth),
-            sample_rate=output_format.sample_rate,
-            bit_depth=output_format.bit_depth,
-            channels=2,
-        )
-        async for chunk in get_ffmpeg_stream(
-            audio_input=self.get_flow_stream(
-                queue=queue,
-                start_queue_item=start_queue_item,
-                pcm_format=pcm_format,
-            ),
-            input_format=pcm_format,
-            output_format=output_format,
-            filter_params=get_player_filter_params(self.mass, queue_player.player_id),
-            chunk_size=icy_meta_interval if enable_icy else None,
-        ):
-            try:
-                await resp.write(chunk)
-            except (BrokenPipeError, ConnectionResetError):
-                break
-
-            if not enable_icy:
-                continue
-
-            # if icy metadata is enabled, send the icy metadata after the chunk
-            if (
-                # use current item here and not buffered item, otherwise
-                # the icy metadata will be too much ahead
-                (current_item := queue.current_item)
-                and current_item.streamdetails
-                and current_item.streamdetails.stream_title
-            ):
-                title = current_item.streamdetails.stream_title
-            elif queue and current_item and current_item.name:
-                title = current_item.name
-            else:
-                title = "Music Assistant"
-            metadata = f"StreamTitle='{title}';".encode()
-            if current_item and current_item.image:
-                metadata += f"StreamURL='{current_item.image.path}'".encode()
-            while len(metadata) % 16 != 0:
-                metadata += b"\x00"
-            length = len(metadata)
-            length_b = chr(int(length / 16)).encode()
-            await resp.write(length_b + metadata)
-
-        return resp
-
-    async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response:
-        """Stream Queue Flow audio to a child player within a multi subscriber setup."""
-        self._log_request(request)
         job_id = request.match_info["job_id"]
-        for queue_id, stream_job in self.multi_client_jobs.items():
+        for queue_id, stream_job in self.stream_jobs.items():
             if stream_job.job_id == job_id:
                 break
         else:
@@ -608,7 +569,6 @@ class StreamsController(CoreController):
             raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished")
         if not (queue := self.mass.player_queues.get(queue_id)):
             raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
-
         player_id = request.match_info["player_id"]
         child_player = self.mass.players.get(player_id)
         if not child_player:
@@ -643,11 +603,11 @@ class StreamsController(CoreController):
 
         # all checks passed, start streaming!
         self.logger.debug(
-            "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s",
+            "Start serving Queue flow audio stream for queue %s to player %s",
             queue.display_name,
             child_player.display_name,
         )
-        async for chunk in stream_job.subscribe(
+        async for chunk in stream_job.iter_player_audio(
             player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None
         ):
             try:
@@ -784,7 +744,6 @@ class StreamsController(CoreController):
         assert pcm_format.content_type.is_pcm()
         queue_track = None
         last_fadeout_part = b""
-        total_bytes_written = 0
         queue.flow_mode = True
         use_crossfade = self.mass.config.get_raw_player_config_value(
             queue.queue_id, CONF_CROSSFADE, False
@@ -845,6 +804,7 @@ class StreamsController(CoreController):
             ):
                 # ALWAYS APPEND CHUNK TO BUFFER
                 buffer += chunk
+                del chunk
                 if len(buffer) < buffer_size:
                     # buffer is not full enough, move on
                     continue
@@ -874,10 +834,9 @@ class StreamsController(CoreController):
 
                 #### OTHER: enough data in buffer, feed to output
                 else:
-                    chunk_size = len(chunk)
-                    yield buffer[:chunk_size]
-                    bytes_written += chunk_size
-                    buffer = buffer[chunk_size:]
+                    yield buffer[:pcm_sample_size]
+                    bytes_written += pcm_sample_size
+                    buffer = buffer[pcm_sample_size:]
 
             #### HANDLE END OF TRACK
             if last_fadeout_part:
@@ -896,28 +855,27 @@ class StreamsController(CoreController):
                 # no crossfade enabled, just yield the (entire) buffer last part
                 yield buffer
                 bytes_written += len(buffer)
-            # clear vars
-            buffer = b""
 
             # update duration details based on the actual pcm data we sent
             # this also accounts for crossfade and silence stripping
-            queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size
+            seconds_streamed = (bytes_written + len(last_fadeout_part)) / pcm_sample_size
+            queue_track.streamdetails.seconds_streamed = seconds_streamed
             queue_track.streamdetails.duration = (
-                queue_track.streamdetails.seconds_skipped
-                or 0 + queue_track.streamdetails.seconds_streamed
+                queue_track.streamdetails.seek_position + seconds_streamed
             )
-            total_bytes_written += bytes_written
             self.logger.debug(
                 "Finished Streaming queue track: %s (%s) on queue %s - seconds streamed: %s",
                 queue_track.streamdetails.uri,
                 queue_track.name,
                 queue.display_name,
-                queue_track.streamdetails.seconds_streamed,
+                seconds_streamed,
             )
 
         # end of queue flow: make sure we yield the last_fadeout_part
         if last_fadeout_part:
             yield last_fadeout_part
+            del last_fadeout_part
+        del buffer
         self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
 
     def _log_request(self, request: web.Request) -> None:
index 6140b035b7d1702d917a68032a82f697ba088afe..12a0987c3ce32ebb0d79d640121a55bf11623536 100644 (file)
@@ -163,13 +163,13 @@ class WebserverController(CoreController):
         # also host the audio preview service
         routes.append(("GET", "/preview", self.serve_preview_stream))
         # start the webserver
+        default_publish_ip = await get_ip()
         if self.mass.running_as_hass_addon:
             # if we're running on the HA supervisor the webserver is secured by HA ingress
             # we only start the webserver on the internal docker network and ingress connects
             # to that internally and exposes the webUI securely
             # if a user also wants to expose a the webserver non securely on his internal
             # network he/she should explicitly do so (and know the risks)
-            default_publish_ip = await get_ip()
             self.publish_port = DEFAULT_SERVER_PORT
             if config.get_value(CONF_EXPOSE_SERVER):
                 bind_ip = "0.0.0.0"
@@ -183,7 +183,8 @@ class WebserverController(CoreController):
         else:
             base_url = config.get_value(CONF_BASE_URL)
             self.publish_port = config.get_value(CONF_BIND_PORT)
-            self.publish_ip = bind_ip = config.get_value(CONF_BIND_IP)
+            self.publish_ip = default_publish_ip
+            bind_ip = config.get_value(CONF_BIND_IP)
         await self._server.setup(
             bind_ip=bind_ip,
             bind_port=self.publish_port,
index 32f21912929b85948af2cd3d9cd08f44c213c3ec..cb2c9857c0c186b4c7bc9db004845b6fc48ebc7c 100644 (file)
@@ -192,7 +192,7 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) -
         item_name = f"{streamdetails.provider}/{streamdetails.item_id}"
         LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name)
         # calculate EBU R128 integrated loudness with ffmpeg
-        ffmpeg_args = _get_ffmpeg_args(
+        ffmpeg_args = get_ffmpeg_args(
             input_format=streamdetails.audio_format,
             output_format=streamdetails.audio_format,
             filter_params=["loudnorm=print_format=json"],
@@ -215,7 +215,7 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) -
                     if chunk_count == 600:
                         # safety guard: max (more or less) 10 minutes of audio may be analyzed!
                         break
-                ffmpeg_proc.write_eof()
+                await ffmpeg_proc.write_eof()
 
             _, stderr = await ffmpeg_proc.communicate()
             if loudness_details := _parse_loudnorm(stderr):
@@ -392,7 +392,6 @@ async def get_media_stream(  # noqa: PLR0915
     """
     logger = LOGGER.getChild("media_stream")
     bytes_sent = 0
-    streamdetails.seconds_skipped = streamdetails.seek_position
     is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
     if is_radio or streamdetails.seek_position:
         strip_silence_begin = False
@@ -400,7 +399,7 @@ async def get_media_stream(  # noqa: PLR0915
     pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
     chunk_size = pcm_sample_size * (1 if is_radio else 2)
     expected_chunks = int((streamdetails.duration or 0) / 2)
-    if expected_chunks < 60:
+    if expected_chunks < 10:
         strip_silence_end = False
 
     # collect all arguments for ffmpeg
@@ -424,7 +423,7 @@ async def get_media_stream(  # noqa: PLR0915
         filter_params.append(filter_rule)
     if streamdetails.fade_in:
         filter_params.append("afade=type=in:start_time=0:duration=3")
-    ffmpeg_args = _get_ffmpeg_args(
+    ffmpeg_args = get_ffmpeg_args(
         input_format=streamdetails.audio_format,
         output_format=pcm_format,
         filter_params=filter_params,
@@ -449,7 +448,7 @@ async def get_media_stream(  # noqa: PLR0915
         async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
             await ffmpeg_proc.write(audio_chunk)
         # write eof when last packet is received
-        ffmpeg_proc.write_eof()
+        await ffmpeg_proc.write_eof()
         logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri)
 
     if streamdetails.direct is None:
@@ -485,11 +484,9 @@ async def get_media_stream(  # noqa: PLR0915
             if prev_chunk:
                 yield prev_chunk
                 bytes_sent += len(prev_chunk)
-
             prev_chunk = chunk
 
-        # all chunks received, strip silence of last part
-
+        # all chunks received, strip silence of last part if needed and yield remaining bytes
         if strip_silence_end and prev_chunk:
             final_chunk = await strip_silence(
                 mass,
@@ -500,9 +497,6 @@ async def get_media_stream(  # noqa: PLR0915
             )
         else:
             final_chunk = prev_chunk
-
-        # ensure the final chunk is sent
-        # its important this is done here at the end so we can catch errors first
         yield final_chunk
         bytes_sent += len(final_chunk)
         del final_chunk
@@ -727,9 +721,9 @@ async def get_ffmpeg_stream(
     Takes care of resampling and/or recoding if needed,
     according to player preferences.
     """
-    logger = LOGGER.getChild("media_stream")
+    logger = LOGGER.getChild("ffmpeg_stream")
     use_stdin = not isinstance(audio_input, str)
-    ffmpeg_args = _get_ffmpeg_args(
+    ffmpeg_args = get_ffmpeg_args(
         input_format=input_format,
         output_format=output_format,
         filter_params=filter_params or [],
@@ -750,7 +744,7 @@ async def get_ffmpeg_stream(
             if ffmpeg_proc.closed:
                 return
             await ffmpeg_proc.write(chunk)
-        ffmpeg_proc.write_eof()
+        await ffmpeg_proc.write_eof()
 
     try:
         if not isinstance(audio_input, str):
@@ -768,7 +762,6 @@ async def get_ffmpeg_stream(
         if writer_task and not writer_task.done():
             writer_task.cancel()
         # use communicate to read stderr and wait for exit
-        # read log for loudness measurement (or errors)
         _, stderr = await ffmpeg_proc.communicate()
         if ffmpeg_proc.returncode != 0:
             # ffmpeg has a non zero returncode meaning trouble
@@ -787,7 +780,7 @@ async def check_audio_support() -> tuple[bool, bool, str]:
     version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0]
     libsoxr_support = "enable-libsoxr" in output.decode()
     result = (ffmpeg_present, libsoxr_support, version)
-    # store in global cache for easy access by '_get_ffmpeg_args'
+    # store in global cache for easy access by 'get_ffmpeg_args'
     await set_global_cache_values({"ffmpeg_support": result})
     return result
 
@@ -830,7 +823,7 @@ async def get_preview_stream(
         async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
             await ffmpeg_proc.write(audio_chunk)
         # write eof when last packet is received
-        ffmpeg_proc.write_eof()
+        await ffmpeg_proc.write_eof()
 
     if not streamdetails.direct:
         writer_task = asyncio.create_task(writer())
@@ -935,13 +928,14 @@ def get_player_filter_params(
     return filter_params
 
 
-def _get_ffmpeg_args(
+def get_ffmpeg_args(
     input_format: AudioFormat,
     output_format: AudioFormat,
     filter_params: list[str],
     extra_args: list[str],
     input_path: str = "-",
     output_path: str = "-",
+    loglevel: str = "info",
 ) -> list[str]:
     """Collect all args to send to the ffmpeg process."""
     ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
@@ -962,7 +956,7 @@ def _get_ffmpeg_args(
         "ffmpeg",
         "-hide_banner",
         "-loglevel",
-        "info",
+        loglevel,
         "-ignore_unknown",
         "-protocol_whitelist",
         "file,http,https,tcp,tls,crypto,pipe,data,fd",
index a057abc2a2f9e4dcbc7ddf017ca82eab831016d1..2700aaf65b1d943d4699be49faeac6535ec5741f 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import annotations
 
 import asyncio
+from types import TracebackType
 from typing import TYPE_CHECKING
 
 from aiohttp.web import Request, Response
@@ -40,7 +41,12 @@ class AuthenticationHelper:
         )
         return self
 
-    async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> bool | None:
         """Exit context manager."""
         self.mass.streams.unregister_dynamic_route(f"/callback/{self.session_id}", "GET")
 
index 1e9c5272b5e70713d5aa9936300e5ccee0ca0a66..db40ac748d23df48a4d7a34731c0571c271a961c 100644 (file)
@@ -1,14 +1,18 @@
-"""Implementation of a (truly) non blocking subprocess.
+"""
+AsyncProcess.
 
-The subprocess implementation in asyncio can (still) sometimes cause deadlocks,
-even when properly handling reading/writes from different tasks.
+Wrapper around asyncio subprocess to help with using pipe streams and
+taking care of properly closing the process in case of exit (on both success and failures),
+without deadlocking.
 """
 
 from __future__ import annotations
 
 import asyncio
 import logging
+import os
 from contextlib import suppress
+from types import TracebackType
 from typing import TYPE_CHECKING
 
 if TYPE_CHECKING:
@@ -22,73 +26,101 @@ DEFAULT_CHUNKSIZE = 128000
 
 
 class AsyncProcess:
-    """Implementation of a (truly) non blocking subprocess."""
+    """
+    AsyncProcess.
+
+    Wrapper around asyncio subprocess to help with using pipe streams and
+    taking care of properly closing the process in case of exit (on both success and failures),
+    without deadlocking.
+    """
 
     def __init__(
         self,
-        args: list,
+        args: list[str],
         enable_stdin: bool = False,
         enable_stdout: bool = True,
         enable_stderr: bool = False,
     ) -> None:
-        """Initialize."""
-        self._proc = None
+        """Initialize AsyncProcess."""
+        self.proc: asyncio.subprocess.Process | None = None
         self._args = args
         self._enable_stdin = enable_stdin
         self._enable_stdout = enable_stdout
         self._enable_stderr = enable_stderr
+        self._close_called = False
+        self._stdin_lock = asyncio.Lock()
+        self._stdout_lock = asyncio.Lock()
+        self._stderr_lock = asyncio.Lock()
+        self._returncode: bool | None = None
 
     @property
     def closed(self) -> bool:
         """Return if the process was closed."""
-        return self.returncode is not None
+        return self._close_called or self.returncode is not None
 
     @property
     def returncode(self) -> int | None:
         """Return the erturncode of the process."""
-        if self._proc is None:
+        if self._returncode is not None:
+            return self._returncode
+        if self.proc is None:
             return None
-        return self._proc.returncode
+        return self.proc.returncode
 
     async def __aenter__(self) -> AsyncProcess:
         """Enter context manager."""
         await self.start()
         return self
 
-    async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> bool | None:
         """Exit context manager."""
         await self.close()
+        self._returncode = self.returncode
+        del self.proc
+        del self._stdin_lock
+        del self._stdout_lock
+        del self._returncode
 
     async def start(self) -> None:
         """Perform Async init of process."""
-        self._proc = await asyncio.create_subprocess_exec(
+        self.proc = await asyncio.create_subprocess_exec(
             *self._args,
             stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
             stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
             stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
             close_fds=True,
         )
+        proc_name_simple = self._args[0].split(os.sep)[-1]
+        LOGGER.debug("Started %s with PID %s", proc_name_simple, self.proc.pid)
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
         while True:
             chunk = await self.readexactly(n)
-            yield chunk
-            if len(chunk) < n:
+            if len(chunk) == 0:
                 break
+            yield chunk
 
     async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks as they come in from process stdout."""
         while True:
             chunk = await self.read(n)
-            if chunk == b"":
+            if len(chunk) == 0:
                 break
             yield chunk
 
     async def readexactly(self, n: int) -> bytes:
         """Read exactly n bytes from the process stdout (or less if eof)."""
+        if self._close_called or self.proc.stdout.at_eof():
+            return b""
         try:
-            return await self._proc.stdout.readexactly(n)
+            async with self._stdout_lock:
+                return await self.proc.stdout.readexactly(n)
         except asyncio.IncompleteReadError as err:
             return err.partial
 
@@ -99,25 +131,36 @@ class AsyncProcess:
         and may return less or equal bytes than requested, but at least one byte.
         If EOF was received before any byte is read, this function returns empty byte object.
         """
-        return await self._proc.stdout.read(n)
+        if self._close_called or self.proc.stdout.at_eof():
+            return b""
+        if self.proc.stdout.at_eof():
+            return b""
+        async with self._stdout_lock:
+            return await self.proc.stdout.read(n)
 
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
-        if self.closed or self._proc.stdin.is_closing():
+        if self._close_called or self.proc.stdin.is_closing():
             return
-        self._proc.stdin.write(data)
-        with suppress(BrokenPipeError):
-            await self._proc.stdin.drain()
-
-    def write_eof(self) -> None:
+        if not self.proc or self.proc.returncode is not None:
+            raise RuntimeError("Process not started or already exited")
+        async with self._stdin_lock:
+            self.proc.stdin.write(data)
+            with suppress(BrokenPipeError):
+                await self.proc.stdin.drain()
+
+    async def write_eof(self) -> None:
         """Write end of file to to process stdin."""
         if not self._enable_stdin:
-            return
-        if self.closed or self._proc.stdin.is_closing():
+            raise RuntimeError("STDIN is not enabled")
+        if not self.proc or self.proc.returncode is not None:
+            raise RuntimeError("Process not started or already exited")
+        if self._close_called or self.proc.stdin.is_closing():
             return
         try:
-            if self._proc.stdin.can_write_eof():
-                self._proc.stdin.write_eof()
+            async with self._stdin_lock:
+                if self.proc.stdin.can_write_eof():
+                    self.proc.stdin.write_eof()
         except (
             AttributeError,
             AssertionError,
@@ -130,32 +173,45 @@ class AsyncProcess:
 
     async def close(self) -> int:
         """Close/terminate the process and wait for exit."""
-        if self.returncode is not None:
-            return self.returncode
-        # make sure the process is cleaned up
-        self._proc.terminate()
-        try:
-            async with asyncio.timeout(10):
-                await self.communicate()
-        except (TimeoutError, asyncio.CancelledError):
-            self._proc.kill()
-        return await self.wait()
+        self._close_called = True
+        if self.proc.returncode is None:
+            # make sure the process is cleaned up
+            try:
+                # we need to use communicate to ensure buffers are flushed
+                await asyncio.wait_for(self.proc.communicate(), 5)
+            except TimeoutError:
+                LOGGER.debug(
+                    "Process with PID %s did not stop within 5 seconds. Sending terminate...",
+                    self.proc.pid,
+                )
+                self.proc.terminate()
+                await self.proc.communicate()
+        LOGGER.debug(
+            "Process with PID %s stopped with returncode %s", self.proc.pid, self.proc.returncode
+        )
+        return self.proc.returncode
 
     async def wait(self) -> int:
         """Wait for the process and return the returncode."""
         if self.returncode is not None:
             return self.returncode
-        return await self._proc.wait()
+        return await self.proc.wait()
 
     async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
         """Write bytes to process and read back results."""
-        stdout, stderr = await self._proc.communicate(input_data)
+        if self.closed:
+            return (b"", b"")
+        async with self._stdout_lock, self._stdin_lock, self._stderr_lock:
+            stdout, stderr = await self.proc.communicate(input_data)
         return (stdout, stderr)
 
     async def read_stderr(self) -> AsyncGenerator[bytes, None]:
         """Read lines from the stderr stream."""
-        async for line in self._proc.stderr:
-            yield line
+        async with self._stderr_lock:
+            async for line in self.proc.stderr:
+                if self.closed:
+                    break
+                yield line
 
 
 async def check_output(shell_cmd: str) -> tuple[int, bytes]:
index d917ef306da6c09dda5260eb8552fbeb211ce72d..73cfeb1201fd6089bcfdca8c8d678cdf4cd8f95e 100644 (file)
@@ -451,7 +451,7 @@ async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> b
             if ffmpeg_proc.closed:
                 break
             await ffmpeg_proc.write(chunk)
-        ffmpeg_proc.write_eof()
+        await ffmpeg_proc.write_eof()
 
     # feed the file contents to the process stdin
     if file_path == "-":
index 58faa52da66c5350b979c32b00f189ea06797954..0daee42e18d456bc1d0deda7d554e2748dc1fed9 100644 (file)
@@ -7,7 +7,6 @@ import os
 import platform
 import socket
 import time
-from collections.abc import AsyncGenerator
 from contextlib import suppress
 from dataclasses import dataclass
 from random import randint, randrange
@@ -42,12 +41,8 @@ from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.player_queue import PlayerQueue
 from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import (
-    get_ffmpeg_stream,
-    get_media_stream,
-    get_player_filter_params,
-)
-from music_assistant.server.helpers.process import check_output
+from music_assistant.server.helpers.audio import get_media_stream
+from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 from music_assistant.server.providers.ugp import UGP_PREFIX
 
@@ -56,6 +51,7 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
+    from music_assistant.server.controllers.streams import QueueStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 DOMAIN = "airplay"
@@ -109,6 +105,10 @@ CONF_CREDENTIALS = "credentials"
 CACHE_KEY_PREV_VOLUME = "airplay_prev_volume"
 FALLBACK_VOLUME = 20
 
+AIRPLAY_PCM_FORMAT = AudioFormat(
+    content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16
+)
+
 
 async def setup(
     mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
@@ -178,23 +178,23 @@ def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
     return None
 
 
-class AirplayStreamJob:
+class AirplayStream:
     """Object that holds the details of a stream job."""
 
     def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None:
-        """Initialize AirplayStreamJob."""
+        """Initialize AirplayStream."""
         self.prov = prov
         self.mass = prov.mass
         self.airplay_player = airplay_player
         # always generate a new active remote id to prevent race conditions
-        # with the named pipe used to send commands
+        # with the named pipe used to send audio
         self.active_remote_id: str = str(randint(1000, 8000))
         self.start_ntp: int | None = None  # use as checksum
         self.prevent_playback: bool = False
-        self._audio_iterator: AsyncGenerator[bytes, None] | None = None
+        self.stream_job: QueueStreamJob | None = None
         self._log_reader_task: asyncio.Task | None = None
         self._audio_reader_task: asyncio.Task | None = None
-        self._cliraop_proc: asyncio.subprocess.Process | None = None
+        self._cliraop_proc: AsyncProcess | None = None
         self._stop_requested = False
 
     @property
@@ -206,10 +206,10 @@ class AirplayStreamJob:
             and self._cliraop_proc.returncode is None
         )
 
-    async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None]) -> None:
+    async def start(self, start_ntp: int, stream_job: QueueStreamJob) -> None:
         """Initialize CLIRaop process for a player."""
         self.start_ntp = start_ntp
-        self._audio_iterator = audio_iterator
+        self.stream_job = stream_job
         extra_args = []
         player_id = self.airplay_player.player_id
         mass_player = self.mass.players.get(player_id)
@@ -220,7 +220,6 @@ class AirplayStreamJob:
         for prop in ("et", "md", "am", "pk", "pw"):
             if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop):
                 extra_args += [f"-{prop}", prop_value]
-
         sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
         if device_password := self.mass.config.get_raw_player_config_value(
             player_id, CONF_PASSWORD, None
@@ -231,7 +230,7 @@ class AirplayStreamJob:
         elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
             extra_args += ["-debug", "10"]
 
-        args = [
+        cliraop_args = [
             self.prov.cliraop_bin,
             "-ntpstart",
             str(start_ntp),
@@ -247,18 +246,20 @@ class AirplayStreamJob:
             "-activeremote",
             self.active_remote_id,
             "-udn",
-            str(self.airplay_player.discovery_info.name),
+            self.airplay_player.discovery_info.name,
             self.airplay_player.address,
             "-",
         ]
         if platform.system() == "Darwin":
             os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
-        self._cliraop_proc = await asyncio.create_subprocess_exec(
-            *args,
-            stdin=asyncio.subprocess.PIPE,
-            stderr=asyncio.subprocess.PIPE,
-            close_fds=True,
+
+        self._cliraop_proc = AsyncProcess(
+            cliraop_args,
+            enable_stdin=True,
+            enable_stdout=False,
+            enable_stderr=True,
         )
+        await self._cliraop_proc.start()
         self._log_reader_task = asyncio.create_task(self._log_watcher())
         self._audio_reader_task = asyncio.create_task(self._audio_reader())
 
@@ -270,26 +271,22 @@ class AirplayStreamJob:
         # send stop with cli command
         await self.send_cli_command("ACTION=STOP")
 
-        async def wait_for_stop() -> None:
+        async def _stop() -> None:
             # always stop the audio feeder
             if self._audio_reader_task and not self._audio_reader_task.done():
                 with suppress(asyncio.CancelledError):
                     self._audio_reader_task.cancel()
-            # make sure stdin is drained (otherwise we'll deadlock)
-            if self._cliraop_proc and self._cliraop_proc.returncode is None:
-                if self._cliraop_proc.stdin.can_write_eof():
-                    self._cliraop_proc.stdin.write_eof()
-                with suppress(BrokenPipeError):
-                    await self._cliraop_proc.stdin.drain()
+            await self._cliraop_proc.write_eof()
+            # the process should exit gracefully after the stop request was processed
             await asyncio.wait_for(self._cliraop_proc.wait(), 30)
 
-        task = self.mass.create_task(wait_for_stop())
+        task = self.mass.create_task(_stop())
         if wait:
             await task
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
-        if not (self._cliraop_proc and self._cliraop_proc.returncode is None):
+        if not self._cliraop_proc or self._cliraop_proc.closed:
             return
 
         named_pipe = f"/tmp/fifo-{self.active_remote_id}"  # noqa: S108
@@ -309,7 +306,7 @@ class AirplayStreamJob:
         mass_player = self.mass.players.get(airplay_player.player_id)
         logger = airplay_player.logger
         lost_packets = 0
-        async for line in self._cliraop_proc.stderr:
+        async for line in self._cliraop_proc.read_stderr():
             line = line.decode().strip()  # noqa: PLW2901
             if not line:
                 continue
@@ -350,16 +347,12 @@ class AirplayStreamJob:
             logger.log(VERBOSE_LOG_LEVEL, line)
 
         # if we reach this point, the process exited
-        logger.debug(
-            "CLIRaop process stopped with errorcode %s",
-            self._cliraop_proc.returncode,
-        )
         if airplay_player.active_stream == self:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
 
     async def _audio_reader(self) -> None:
-        """Read audio chunks and send them to the cliraop process."""
+        """Send audio chunks to the cliraop process."""
         logger = self.airplay_player.logger
         mass_player = self.mass.players.get(self.airplay_player.player_id, True)
         queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
@@ -370,16 +363,13 @@ class AirplayStreamJob:
         )
         prev_metadata_checksum: str = ""
         prev_progress_report: float = 0
-        async for chunk in self._audio_iterator:
-            if not self.running:
-                return
-            self._cliraop_proc.stdin.write(chunk)
-            try:
-                await self._cliraop_proc.stdin.drain()
-            except (BrokenPipeError, ConnectionResetError):
-                break
-            if not self.running:
+
+        async for chunk in self.stream_job.iter_player_audio(
+            self.airplay_player.player_id, AIRPLAY_PCM_FORMAT
+        ):
+            if self._stop_requested:
                 return
+            await self._cliraop_proc.write(chunk)
             # send metadata to player(s) if needed
             # NOTE: this must all be done in separate tasks to not disturb audio
             now = time.time()
@@ -397,10 +387,7 @@ class AirplayStreamJob:
                     prev_progress_report = now
                     self.mass.create_task(self._send_progress(queue))
         # send EOF
-        if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing():
-            self._cliraop_proc.stdin.write_eof()
-            with suppress(BrokenPipeError, ConnectionResetError):
-                await self._cliraop_proc.stdin.drain()
+        await self._cliraop_proc.write_eof()
         logger.debug(
             "Finished RAOP stream for Queue %s to %s",
             queue.display_name,
@@ -465,7 +452,7 @@ class AirPlayPlayer:
     discovery_info: AsyncServiceInfo
     address: str
     logger: logging.Logger
-    active_stream: AirplayStreamJob | None = None
+    active_stream: AirplayStream | None = None
 
 
 class AirplayProvider(PlayerProvider):
@@ -620,39 +607,30 @@ class AirplayProvider(PlayerProvider):
         for airplay_player in self._get_sync_clients(player_id):
             if airplay_player.active_stream and airplay_player.active_stream.running:
                 await airplay_player.active_stream.stop(wait=False)
-        pcm_format = AudioFormat(
-            content_type=ContentType.PCM_S16LE,
-            sample_rate=44100,
-            bit_depth=16,
-            channels=2,
-        )
-        if queue_item.media_type == MediaType.ANNOUNCEMENT:
-            # stream announcement url directly
-            stream_job = None
-        elif (
-            queue_item.queue_id.startswith(UGP_PREFIX)
-            and (stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id))
-            and stream_job.pending
-        ):
-            # handle special case for UGP multi client stream
-            pass
-        elif player.group_childs:
-            # create a new multi client flow stream
-            stream_job = await self.mass.streams.create_multi_client_stream_job(
-                queue_item.queue_id,
-                queue_item,
-                pcm_bit_depth=16,
-                pcm_sample_rate=44100,
-            )
+
+        if queue_item.queue_id.startswith(UGP_PREFIX):
+            # special case: we got forwarded a request from the UGP
+            # use the existing stream job that was already created by UGP
+            stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
         else:
-            # for a single player we just consume the flow stream directly
-            stream_job = None
+            if queue_item.media_type == MediaType.ANNOUNCEMENT:
+                # stream announcement url directly
+                audio_source = get_media_stream(
+                    self.mass, queue_item.streamdetails, pcm_format=AIRPLAY_PCM_FORMAT
+                )
+            else:
+                queue = self.mass.player_queues.get(queue_item.queue_id)
+                audio_source = self.mass.streams.get_flow_stream(
+                    queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
+                )
+            stream_job = self.mass.streams.create_stream_job(
+                queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=AIRPLAY_PCM_FORMAT
+            )
 
-        # Python is not suitable for realtime audio streaming.
-        # So, I've decided to go the fancy route here. I've created a small binary
-        # written in C based on libraop to do the actual timestamped playback.
-        # the raw pcm audio is fed to the stdin of this cliraop binary and we can
-        # send some commands over a named pipe.
+        # Python is not suitable for realtime audio streaming so we do the actual streaming
+        # of (RAOP) audio using a small executable written in C based on libraop to do the actual
+        # timestamped playback. The raw pcm audio is fed to a named pipe, read by the executable
+        # and we can send some ingteractie commands to the process stdin.
 
         # get current ntp before we start
         _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
@@ -661,32 +639,10 @@ class AirplayProvider(PlayerProvider):
         # setup Raop process for player and its sync childs
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                if stream_job:
-                    stream_job.expected_players.add(airplay_player.player_id)
-                    audio_iterator = stream_job.subscribe(
-                        player_id=airplay_player.player_id,
-                        output_format=pcm_format,
-                    )
-                elif queue_item.media_type == MediaType.ANNOUNCEMENT:
-                    # stream announcement url directly
-                    audio_iterator = get_media_stream(
-                        self.mass, queue_item.streamdetails, pcm_format=pcm_format
-                    )
-                else:
-                    queue = self.mass.player_queues.get_active_queue(queue_item.queue_id)
-                    audio_iterator = get_ffmpeg_stream(
-                        self.mass.streams.get_flow_stream(
-                            queue,
-                            start_queue_item=queue_item,
-                            pcm_format=pcm_format,
-                        ),
-                        input_format=pcm_format,
-                        output_format=pcm_format,
-                        filter_params=get_player_filter_params(self.mass, airplay_player.player_id),
-                    )
-                airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
-                tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator))
-        if stream_job and queue_item.queue_item_id != "flow":
+                stream_job.expected_players.add(airplay_player.player_id)
+                airplay_player.active_stream = AirplayStream(self, airplay_player)
+                tg.create_task(airplay_player.active_stream.start(start_ntp, stream_job))
+        if not queue_item.queue_id.startswith(UGP_PREFIX):
             stream_job.start()
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
@@ -929,7 +885,7 @@ class AirplayProvider(PlayerProvider):
                 self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
             elif path == "/ctrl-int/1/shuffle_songs":
                 queue = self.mass.player_queues.get(player_id)
-                self.mass.create_task(
+                self.mass.loop.call_soon(
                     self.mass.player_queues.set_shuffle(
                         active_queue.queue_id, not queue.shuffle_enabled
                     )
index 4507cb42bf99075522770c042ddea70c9f9d81e5..2e0a8e326566f53a3c36f345f2dd50135c24f218 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ
index 8661219a46a91138450853158af63773d39049b5..ed7ddfbc1eb964bd81ab4702aa31bd8dc561a0cb 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ
index 38de7c6113fd22af48cc3167feb6aaac31f0e68d..fbd633a0bf6f5f51d010561a1588819ddeb2cfe1 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ
index 3e6528b2c7bea68298a89dc15f650882c54f5b0b..0c849de7af5ecc3d6b8e64555f957fb4eae82fc8 100644 (file)
@@ -244,7 +244,7 @@ class ChromecastProvider(PlayerProvider):
         use_flow_mode = await self.mass.config.get_player_config_value(
             player_id, CONF_FLOW_MODE
         ) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
-        url = await self.mass.streams.resolve_stream_url(
+        url = self.mass.streams.resolve_stream_url(
             player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
@@ -268,7 +268,7 @@ class ChromecastProvider(PlayerProvider):
             url = self.mass.streams.get_command_url(queue_item, "next")
             queue_item = None
         else:
-            url = await self.mass.streams.resolve_stream_url(
+            url = self.mass.streams.resolve_stream_url(
                 player_id,
                 queue_item=queue_item,
                 output_codec=ContentType.FLAC,
index d69b8e000a0925d9a44266534f064779efc1c593..4f7994a1cef33aa5f7219c5737d0b4ddcd90b86e 100644 (file)
@@ -350,7 +350,7 @@ class DLNAPlayerProvider(PlayerProvider):
         """Handle PLAY MEDIA on given player."""
         use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
         enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        url = await self.mass.streams.resolve_stream_url(
+        url = self.mass.streams.resolve_stream_url(
             player_id,
             queue_item=queue_item,
             output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
@@ -383,7 +383,7 @@ class DLNAPlayerProvider(PlayerProvider):
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
         """Handle enqueuing of the next queue item on the player."""
         dlna_player = self.dlnaplayers[player_id]
-        url = await self.mass.streams.resolve_stream_url(
+        url = self.mass.streams.resolve_stream_url(
             player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
index d51e6589d6782134c216dca0a9f3a3920ab20f1e..2c296b2a418190aa0085ab2836e5121ad88dbd6e 100644 (file)
@@ -186,7 +186,7 @@ class FullyKioskProvider(PlayerProvider):
         """Handle PLAY MEDIA on given player."""
         player = self.mass.players.get(player_id)
         enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        url = await self.mass.streams.resolve_stream_url(
+        url = self.mass.streams.resolve_stream_url(
             player_id,
             queue_item=queue_item,
             output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
index 10a275d663b7aa3c74e4adb64c8657905c4fda78..dd726857b538c0b94187ceba30e1669d6a053dea 100644 (file)
@@ -253,7 +253,7 @@ class HomeAssistantPlayers(PlayerProvider):
         """Handle PLAY MEDIA on given player."""
         use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
         enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        url = await self.mass.streams.resolve_stream_url(
+        url = self.mass.streams.resolve_stream_url(
             player_id,
             queue_item=queue_item,
             output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
@@ -288,7 +288,7 @@ class HomeAssistantPlayers(PlayerProvider):
         This will NOT be called if the end of the queue is reached (and repeat disabled).
         This will NOT be called if the player is using flow mode to playback the queue.
         """
-        url = await self.mass.streams.resolve_stream_url(
+        url = self.mass.streams.resolve_stream_url(
             player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
index 6e20cf3ac00d7cc64a91c0e661d2729b401ddd84..08caea85deed43f3e5ef02106118e47f470384ea 100644 (file)
@@ -44,6 +44,7 @@ from music_assistant.common.models.enums import (
     RepeatMode,
 )
 from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError
+from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.constants import (
     CONF_CROSSFADE,
@@ -55,6 +56,7 @@ from music_assistant.constants import (
     VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.models.player_provider import PlayerProvider
+from music_assistant.server.providers.ugp import UGP_PREFIX
 
 if TYPE_CHECKING:
     from aioslimproto.models import SlimEvent
@@ -79,10 +81,10 @@ STATE_MAP = {
 REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
 
 # sync constants
-MIN_DEVIATION_ADJUST = 6  # 6 milliseconds
+MIN_DEVIATION_ADJUST = 8  # 5 milliseconds
 MIN_REQ_PLAYPOINTS = 8  # we need at least 8 measurements
-DEVIATION_JUMP_IGNORE = 2000  # ignore a sudden unrealistic jump
-MAX_SKIP_AHEAD_MS = 500  # 0.5 seconds
+DEVIATION_JUMP_IGNORE = 5000  # ignore a sudden unrealistic jump
+MAX_SKIP_AHEAD_MS = 800  # 0.8 seconds
 
 
 @dataclass
@@ -108,10 +110,10 @@ DEFAULT_VISUALIZATION = SlimVisualisationType.SPECTRUM_ANALYZER.value
 CONF_ENTRY_DISPLAY = ConfigEntry(
     key=CONF_DISPLAY,
     type=ConfigEntryType.BOOLEAN,
-    default_value=True,
+    default_value=False,
     required=False,
     label="Enable display support",
-    description="Enable/disable native display support on " "squeezebox or squeezelite32 hardware.",
+    description="Enable/disable native display support on squeezebox or squeezelite32 hardware.",
     advanced=True,
 )
 CONF_ENTRY_VISUALIZATION = ConfigEntry(
@@ -216,6 +218,7 @@ class SlimprotoProvider(PlayerProvider):
 
     slimproto: SlimServer
     _sync_playpoints: dict[str, deque[SyncPlayPoint]]
+    _do_not_resync_before: dict[str, float]
 
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
@@ -225,6 +228,7 @@ class SlimprotoProvider(PlayerProvider):
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
         self._sync_playpoints = {}
+        self._do_not_resync_before = {}
         self._resync_handle: asyncio.TimerHandle | None = None
         control_port = self.config.get_value(CONF_PORT)
         telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
@@ -335,17 +339,30 @@ class SlimprotoProvider(PlayerProvider):
         if player.synced_to:
             msg = "A synced player cannot receive play commands directly"
             raise RuntimeError(msg)
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
+
         if player.group_childs:
-            # player has sync members, we need to start a multi slimplayer stream job
-            stream_job = await self.mass.streams.create_multi_client_stream_job(
+            # player has sync members, we need to start a (multi-player) stream job
+            # to make sure that all clients receive the exact same audio
+            pcm_format = AudioFormat(
+                content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
+            )
+            queue = self.mass.player_queues.get(queue_item.queue_id)
+            stream_job = self.mass.streams.create_stream_job(
                 queue_id=queue_item.queue_id,
-                start_queue_item=queue_item,
+                pcm_audio_source=self.mass.streams.get_flow_stream(
+                    queue,
+                    start_queue_item=queue_item,
+                    pcm_format=pcm_format,
+                ),
+                pcm_format=pcm_format,
             )
             # forward command to player and any connected sync members
             sync_clients = self._get_sync_clients(player_id)
             async with asyncio.TaskGroup() as tg:
                 for slimplayer in sync_clients:
+                    enforce_mp3 = await self.mass.config.get_player_config_value(
+                        slimplayer.player_id, CONF_ENFORCE_MP3
+                    )
                     tg.create_task(
                         self._handle_play_url(
                             slimplayer,
@@ -358,18 +375,19 @@ class SlimprotoProvider(PlayerProvider):
                             auto_play=False,
                         )
                     )
-            if queue_item.queue_item_id != "flow":
+            if not queue_item.queue_id.startswith(UGP_PREFIX):
                 stream_job.start()
         else:
             # regular, single player playback
             slimplayer = self.slimproto.get_player(player_id)
             if not slimplayer:
                 return
-            url = await self.mass.streams.resolve_stream_url(
+            enforce_mp3 = await self.mass.config.get_player_config_value(
+                player_id, CONF_ENFORCE_MP3
+            )
+            url = self.mass.streams.resolve_stream_url(
                 player_id,
                 queue_item=queue_item,
-                # for now just hardcode flac as we assume that every (modern)
-                # slimproto based player can handle that just fine
                 output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
                 flow_mode=False,
             )
@@ -386,7 +404,7 @@ class SlimprotoProvider(PlayerProvider):
         if not (slimplayer := self.slimproto.get_player(player_id)):
             return
         enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        url = await self.mass.streams.resolve_stream_url(
+        url = self.mass.streams.resolve_stream_url(
             player_id,
             queue_item=queue_item,
             output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
@@ -682,9 +700,9 @@ class SlimprotoProvider(PlayerProvider):
             self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
             slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
             slimplayer.signal_update()
-        elif event.data == "button jump_fwd":
+        elif event.data in ("button jump_fwd", "button fwd"):
             await self.mass.player_queues.next(queue.queue_id)
-        elif event.data == "button jump_rew":
+        elif event.data in ("button jump_rew", "button rew"):
             await self.mass.player_queues.previous(queue.queue_id)
         elif event.data.startswith("time "):
             # seek request
@@ -715,12 +733,15 @@ class SlimprotoProvider(PlayerProvider):
         sync_playpoints = self._sync_playpoints[slimplayer.player_id]
 
         active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
-        stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
+        stream_job = self.mass.streams.stream_jobs.get(active_queue.queue_id)
         if not stream_job:
             # should not happen, but just in case
             return
 
         now = time.time()
+        if now < self._do_not_resync_before[slimplayer.player_id]:
+            return
+
         last_playpoint = sync_playpoints[-1] if sync_playpoints else None
         if last_playpoint and (now - last_playpoint.timestamp) > 10:
             # last playpoint is too old, invalidate
@@ -741,7 +762,8 @@ class SlimprotoProvider(PlayerProvider):
         # we can now append the current playpoint to our list
         sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff))
 
-        if len(sync_playpoints) < MIN_REQ_PLAYPOINTS:
+        min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS
+        if len(sync_playpoints) < min_req_playpoints:
             return
 
         # get the average diff
@@ -753,22 +775,20 @@ class SlimprotoProvider(PlayerProvider):
 
         # resync the player by skipping ahead or pause for x amount of (milli)seconds
         sync_playpoints.clear()
+        self._do_not_resync_before[player.player_id] = now + 5
         if avg_diff > MAX_SKIP_AHEAD_MS:
             # player lagging behind more than MAX_SKIP_AHEAD_MS,
             # we need to correct the sync_master
             self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta)
             self.mass.create_task(sync_master.pause_for(delta))
-            sync_master._elapsed_milliseconds -= delta
         elif avg_diff > 0:
             # handle player lagging behind, fix with skip_ahead
             self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta)
             self.mass.create_task(slimplayer.skip_over(delta))
-            sync_master._elapsed_milliseconds += delta
         else:
             # handle player is drifting too far ahead, use pause_for to adjust
             self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta)
             self.mass.create_task(slimplayer.pause_for(delta))
-            sync_master._elapsed_milliseconds -= delta
 
     async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None:
         """Handle buffer ready event, player has buffered a (new) track.
@@ -799,12 +819,13 @@ class SlimprotoProvider(PlayerProvider):
         async with asyncio.TaskGroup() as tg:
             for _client in self._get_sync_clients(player.player_id):
                 self._sync_playpoints.setdefault(
-                    _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS * 2)
+                    _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
                 ).clear()
                 # NOTE: Officially you should do an unpause_at based on the player timestamp
                 # but I did not have any good results with that.
                 # Instead just start playback on all players and let the sync logic work out
                 # the delays etc.
+                self._do_not_resync_before[_client.player_id] = time.time() + 1
                 tg.create_task(_client.unpause_at(0))
 
     async def _handle_connected(self, slimplayer: SlimClient) -> None:
@@ -830,6 +851,7 @@ class SlimprotoProvider(PlayerProvider):
             init_volume = DEFAULT_PLAYER_VOLUME
             init_power = False
         await slimplayer.power(init_power)
+        await slimplayer.stop()
         await slimplayer.volume_set(init_volume)
 
     def _get_sync_clients(self, player_id: str) -> Iterator[SlimClient]:
index b6becf6ac8efc9a6e2742fcb157e930b0f769f61..0fa541b31d5196e2fa4a08a9337e2c23552b6b51 100644 (file)
@@ -4,13 +4,17 @@ from __future__ import annotations
 
 import asyncio
 import random
+import socket
 import time
 from contextlib import suppress
 from typing import TYPE_CHECKING, cast
 
 from snapcast.control import create_server
 from snapcast.control.client import Snapclient
+from zeroconf import NonUniqueNameException
+from zeroconf.asyncio import AsyncServiceInfo
 
+from music_assistant.common.helpers.util import get_ip_pton
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
@@ -136,6 +140,7 @@ class SnapCastProvider(PlayerProvider):
         self._snapcast_server_control_port = self.config.get_value(CONF_SERVER_CONTROL_PORT)
         self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
         self._stream_tasks = {}
+
         if self._use_builtin_server:
             # start our own builtin snapserver
             self._snapserver_started = asyncio.Event()
@@ -169,11 +174,17 @@ class SnapCastProvider(PlayerProvider):
         """Handle close/cleanup of the provider."""
         for client in self._snapserver.clients:
             await self.cmd_stop(client.identifier)
-        await self._snapserver.stop()
-        self._snapserver_started.clear()
         if self._snapserver_runner and not self._snapserver_runner.done():
             self._snapserver_runner.cancel()
-        await asyncio.sleep(2)  # prevent race conditions when reloading
+        await asyncio.sleep(6)  # prevent race conditions when reloading
+        await self._snapserver.stop()
+        self._snapserver_started.clear()
+
+    def on_player_config_removed(self, player_id: str) -> None:
+        """Call (by config manager) when the configuration of a player is removed."""
+        super().on_player_config_removed(player_id)
+        if self._use_builtin_server:
+            self.mass.create_task(self._snapserver.delete_client(player_id))
 
     def _handle_update(self) -> None:
         """Process Snapcast init Player/Group and set callback ."""
@@ -297,60 +308,59 @@ class SnapCastProvider(PlayerProvider):
             bit_depth=16,
             channels=2,
         )
-        if queue_item.media_type == MediaType.ANNOUNCEMENT:
-            # stream announcement url directly
-            audio_iterator = get_media_stream(
-                self.mass, queue_item.streamdetails, pcm_format=pcm_format
-            )
-        elif (
-            queue_item.queue_id.startswith(UGP_PREFIX)
-            and (stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id))
-            and stream_job.pending
-        ):
-            # handle special case for UGP multi client stream
-            stream_job = self.mass.streams.multi_client_jobs.get(queue_item.queue_id)
-            stream_job.expected_players.add(player_id)
-            audio_iterator = stream_job.subscribe(
-                player_id=player_id,
-                output_format=pcm_format,
-            )
+
+        if queue_item.queue_id.startswith(UGP_PREFIX):
+            # special case: we got forwarded a request from the UGP
+            # use the existing stream job that was already created by UGP
+            stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
         else:
-            audio_iterator = self.mass.streams.get_flow_stream(
-                queue,
-                start_queue_item=queue_item,
-                pcm_format=pcm_format,
+            if queue_item.media_type == MediaType.ANNOUNCEMENT:
+                # stream announcement url directly
+                audio_source = get_media_stream(
+                    self.mass, queue_item.streamdetails, pcm_format=pcm_format
+                )
+            else:
+                queue = self.mass.player_queues.get(queue_item.queue_id)
+                audio_source = self.mass.streams.get_flow_stream(
+                    queue, start_queue_item=queue_item, pcm_format=pcm_format
+                )
+            stream_job = self.mass.streams.create_stream_job(
+                queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=pcm_format
             )
+        stream_job.expected_players.add(player_id)
 
         async def _streamer() -> None:
             host = self._snapcast_server_host
-            _, writer = await asyncio.open_connection(host, port)
-            self.logger.debug("Opened connection to %s:%s", host, port)
-            player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
-            player.elapsed_time = 0
-            player.elapsed_time_last_updated = time.time()
-            player.state = PlayerState.PLAYING
-            self._set_childs_state(player_id, PlayerState.PLAYING)
-            self.mass.players.register_or_update(player)
+            self.mass.players.update(player_id)
+
+            def stream_callback(_stream) -> None:
+                player.state = PlayerState(stream.status)
+                if player.state == PlayerState.PLAYING:
+                    player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
+                    player.elapsed_time = 0
+                    player.elapsed_time_last_updated = time.time()
+                self._set_childs_state(player_id, player.state)
+
+            stream.set_callback(stream_callback)
+            stream_path = f"tcp://{host}:{port}"
+            self.logger.debug("Start streaming to %s", stream_path)
             try:
-                async for pcm_chunk in audio_iterator:
-                    writer.write(pcm_chunk)
-                    await writer.drain()
-                # end of the stream reached
-                if writer.can_write_eof():
-                    writer.write_eof()
-                    await writer.drain()
-                # we need to wait a bit before removing the stream to ensure
-                # that all snapclients have consumed the audio
-                # https://github.com/music-assistant/hass-music-assistant/issues/1962
-                await asyncio.sleep(30)
+                await stream_job.stream_to_custom_output_path(
+                    player_id, pcm_format, f"tcp://{host}:{port}"
+                )
+                # we need to wait a bit for the stream status to become idle
+                # to ensure that all snapclients have consumed the audio
+                await self.mass.players.wait_for_state(player, PlayerState.IDLE)
             finally:
-                if not writer.is_closing():
-                    writer.close()
+                self.logger.debug("Finished streaming to %s", stream_path)
+                # there is no way to unsub the callback to we do this nasty
+                stream._callback_func = None
                 await self._snapserver.stream_remove_stream(stream.identifier)
-                self.logger.debug("Closed connection to %s:%s", host, port)
 
         # start streaming the queue (pcm) audio in a background task
         self._stream_tasks[player_id] = asyncio.create_task(_streamer())
+        if not queue_item.queue_id.startswith(UGP_PREFIX):
+            stream_job.start()
 
     def _get_snapgroup(self, player_id: str) -> Snapgroup:
         """Get snapcast group for given player_id."""
@@ -416,6 +426,38 @@ class SnapCastProvider(PlayerProvider):
             raise RuntimeError("Snapserver is already started!")
         logger = self.logger.getChild("snapserver")
         logger.info("Starting builtin Snapserver...")
+        # register the snapcast mdns services
+        for name, port in (
+            ("-http", 1780),
+            ("-jsonrpc", 1705),
+            ("-stream", 1704),
+            ("-tcp", 1705),
+            ("", 1704),
+        ):
+            zeroconf_type = f"_snapcast{name}._tcp.local."
+            try:
+                info = AsyncServiceInfo(
+                    zeroconf_type,
+                    name=f"Snapcast.{zeroconf_type}",
+                    properties={"is_mass": "true"},
+                    addresses=[await get_ip_pton(self.mass.webserver.publish_ip)],
+                    port=port,
+                    server=f"{socket.gethostname()}",
+                )
+                attr_name = f"zc_service_set{name}"
+                if getattr(self, attr_name, None):
+                    await self.mass.aiozc.async_update_service(info)
+                else:
+                    await self.mass.aiozc.async_register_service(info, strict=False)
+                setattr(self, attr_name, True)
+            except NonUniqueNameException:
+                self.logger.debug(
+                    "Could not register mdns record for %s as its already in use", zeroconf_type
+                )
+            except Exception as err:
+                self.logger.exception(
+                    "Could not register mdns record for %s: %s", zeroconf_type, str(err)
+                )
         async with AsyncProcess(
             ["snapserver"], enable_stdin=False, enable_stdout=True, enable_stderr=False
         ) as snapserver_proc:
index a31199f7387c0d91a3d804f949246ab4ca87f6e0..d5938754fde1b2578933df71ff7a2f407d0ed992 100644 (file)
@@ -3,8 +3,12 @@
   "domain": "snapcast",
   "name": "Snapcast",
   "description": "Support for snapcast server and clients.",
-  "codeowners": ["@SantigoSotoC"],
-  "requirements": ["snapcast-mod==2.4.3"],
+  "codeowners": [
+    "@SantigoSotoC"
+  ],
+  "requirements": [
+    "snapcast==2.3.6"
+  ],
   "documentation": "https://music-assistant.io/player-support/snapcast/",
   "multi_instance": false,
   "builtin": false,
index 0abe5b53c187855dac700ee3e2ca8b8476668378..e69482b1f8b409d8b78044c4df1670e63ef3cf80 100644 (file)
@@ -36,6 +36,7 @@ from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL
 from music_assistant.server.helpers.didl_lite import create_didl_metadata
 from music_assistant.server.models.player_provider import PlayerProvider
+from music_assistant.server.providers.ugp import UGP_PREFIX
 
 from .player import SonosPlayer
 
@@ -341,11 +342,6 @@ class SonosPlayerProvider(PlayerProvider):
         queue_item: QueueItem,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
-        url = await self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.FLAC,
-        )
         sonos_player = self.sonosplayers[player_id]
         mass_player = self.mass.players.get(player_id)
         if sonos_player.sync_coordinator:
@@ -355,10 +351,17 @@ class SonosPlayerProvider(PlayerProvider):
                 "accept play_media command, it is synced to another player."
             )
             raise PlayerCommandFailed(msg)
-        await self.mass.create_task(
+
+        is_flow_stream = queue_item.queue_item_id == "flow" or queue_item.queue_id.startswith(
+            UGP_PREFIX
+        )
+        url = self.mass.streams.resolve_stream_url(
+            player_id, queue_item=queue_item, output_codec=ContentType.FLAC
+        )
+        self.mass.create_task(
             sonos_player.soco.play_uri,
             url,
-            meta=create_didl_metadata(self.mass, url, queue_item),
+            meta=create_didl_metadata(self.mass, url, None if is_flow_stream else queue_item),
         )
 
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
@@ -377,7 +380,7 @@ class SonosPlayerProvider(PlayerProvider):
         This will NOT be called if flow mode is enabled on the queue.
         """
         sonos_player = self.sonosplayers[player_id]
-        url = await self.mass.streams.resolve_stream_url(
+        url = self.mass.streams.resolve_stream_url(
             player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
index 3a64e30d817623834ae012d5db718aed2ff23592..b99f700f790b1a73d1e9b44202bc0b2a1002bd93 100644 (file)
@@ -402,7 +402,7 @@ class SpotifyProvider(MusicProvider):
             # retry with ap-port set to invalid value, which will force fallback
             args += ["--ap-port", "12345"]
             async with AsyncProcess(args) as librespot_proc:
-                async for chunk in librespot_proc.iter_any(64000):
+                async for chunk in librespot_proc.iter_any():
                     yield chunk
             self._ap_workaround = True
 
index f11eb9253d79d17df252e5db540ac08a689ba1a9..8998adf1cb7e3f0288b5723e31ceaf23db2dd113 100644 (file)
@@ -20,14 +20,20 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
+    ContentType,
     PlayerFeature,
     PlayerState,
     PlayerType,
     ProviderFeature,
 )
+from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.queue_item import QueueItem
 from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
+from music_assistant.server.controllers.streams import (
+    FLOW_DEFAULT_BIT_DEPTH,
+    FLOW_DEFAULT_SAMPLE_RATE,
+)
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
@@ -138,6 +144,8 @@ class UniversalGroupProvider(PlayerProvider):
                 if member.state == PlayerState.IDLE:
                     continue
                 tg.create_task(self.mass.players.cmd_stop(member.player_id))
+        if existing := self.mass.streams.stream_jobs.pop(player_id, None):
+            existing.stop()
 
     async def cmd_play(self, player_id: str) -> None:
         """Send PLAY command to given player."""
@@ -166,11 +174,22 @@ class UniversalGroupProvider(PlayerProvider):
         await self.cmd_power(player_id, True)
         group_player = self.mass.players.get(player_id)
 
+        await self.cmd_stop(player_id)
+
         # create a multi-client stream job - all (direct) child's of this UGP group
         # will subscribe to this multi client queue stream
-        stream_job = await self.mass.streams.create_multi_client_stream_job(
-            player_id,
-            start_queue_item=queue_item,
+        pcm_format = AudioFormat(
+            content_type=ContentType.from_bit_depth(FLOW_DEFAULT_BIT_DEPTH),
+            sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
+            bit_depth=FLOW_DEFAULT_BIT_DEPTH,
+        )
+        queue = self.mass.player_queues.get(player_id)
+        stream_job = self.mass.streams.create_stream_job(
+            queue.queue_id,
+            pcm_audio_source=self.mass.streams.get_flow_stream(
+                queue=queue, start_queue_item=queue_item, pcm_format=pcm_format
+            ),
+            pcm_format=pcm_format,
         )
         # create a fake queue item to forward to downstream play_media commands
         ugp_queue_item = QueueItem(
index 461f01b04912b7f72d849bcde19aacb9cd867f5b..1f1d2c731b5f0e5084d9bf86b1fc0c31c5f04585 100644 (file)
@@ -29,6 +29,7 @@ from music_assistant.constants import (
     CONFIGURABLE_CORE_CONTROLLERS,
     MIN_SCHEMA_VERSION,
     ROOT_LOGGER_NAME,
+    VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.controllers.cache import CacheController
 from music_assistant.server.controllers.config import ConfigController
@@ -627,7 +628,13 @@ class MusicAssistant:
                 await info.async_request(zeroconf, 3000)
             await prov.on_mdns_service_state_change(name, state_change, info)
 
-        LOGGER.debug(f"Service {name} of type {service_type} state changed: {state_change}")
+        LOGGER.log(
+            VERBOSE_LOG_LEVEL,
+            "Service %s of type %s state changed: %s",
+            name,
+            service_type,
+            state_change,
+        )
         for prov in self._providers.values():
             if not prov.manifest.mdns_discovery:
                 continue
@@ -647,9 +654,6 @@ class MusicAssistant:
     ) -> bool | None:
         """Exit context manager."""
         await self.stop()
-        if exc_val:
-            raise exc_val
-        return exc_type
 
     async def _update_available_providers_cache(self) -> None:
         """Update the global cache variable of loaded/available providers."""
index 3d0f3245cb429d064f1a2fcd8b9cc3403fce60b8..c85e39a24b917fb166264e56d18f47d1a2bebb59 100644 (file)
@@ -32,7 +32,7 @@ python-fullykiosk==0.0.12
 python-slugify==8.0.4
 radios==0.3.0
 shortuuid==1.0.12
-snapcast-mod==2.4.3
+snapcast==2.3.6
 soco==0.30.2
 sonos-websocket==0.1.3
 tidalapi==0.7.4