Flow stream enhancements (#1162)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 21 Mar 2024 23:43:05 +0000 (00:43 +0100)
committerGitHub <noreply@github.com>
Thu, 21 Mar 2024 23:43:05 +0000 (00:43 +0100)
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/didl_lite.py
music_assistant/server/helpers/process.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/dlna/__init__.py
music_assistant/server/providers/sonos/__init__.py
music_assistant/server/server.py

index cde50047c7a59e5be0ea551b39c1d09b7fb66c75..89fefd41f345f7856cf2c67a6b0891e72b135035 100644 (file)
@@ -13,13 +13,17 @@ import logging
 import time
 import urllib.parse
 from collections.abc import AsyncGenerator
-from contextlib import asynccontextmanager
 from typing import TYPE_CHECKING
 
 import shortuuid
 from aiohttp import web
 
-from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
+from music_assistant.common.helpers.util import (
+    empty_queue,
+    get_ip,
+    select_free_port,
+    try_parse_bool,
+)
 from music_assistant.common.models.config_entries import (
     ConfigEntry,
     ConfigValueOption,
@@ -85,8 +89,8 @@ class QueueStreamJob:
     The whole idea here is that the (pcm) audio source can be sent to multiple
     players at once. For example for (slimproto/airplay) syncgroups and universal group.
 
-    All client players receive the exact same audio chunks from the source audio,
-    then encoded and/or resampled to the player's preferences.
+    All client players receive the exact same PCM audio chunks from the source audio,
+    which then can be optionally encoded and/or resampled to the player's preferences.
     In case a stream is restarted (e.g. when seeking),
     a new QueueStreamJob will be created.
     """
@@ -104,43 +108,54 @@ class QueueStreamJob:
         self.mass = mass
         self.pcm_audio_source = pcm_audio_source
         self.pcm_format = pcm_format
-        self.auto_start = auto_start
         self.expected_players: set[str] = set()
         self.job_id = shortuuid.uuid()
         self.bytes_streamed: int = 0
-        self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}")
-        self._subscribed_players: dict[str, AsyncProcess] = {}
+        self.logger = self.mass.streams.logger
+        self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
         self._finished = False
-        self._running = False
-        self.allow_start = asyncio.Event()
+        self.allow_start = auto_start
+        self._all_clients_connected = asyncio.Event()
         self._audio_task = asyncio.create_task(self._stream_job_runner())
 
     @property
     def finished(self) -> bool:
         """Return if this StreamJob is finished."""
-        return self._finished or self._audio_task and self._audio_task.done()
+        return self._finished or (self._audio_task and self._audio_task.done())
 
     @property
     def pending(self) -> bool:
         """Return if this Job is pending start."""
-        return not self.finished and not self.running
+        return not self.finished and not self._all_clients_connected.is_set()
 
     @property
     def running(self) -> bool:
         """Return if this Job is running."""
-        return self._running and self._audio_task and not self._audio_task.done()
+        return (
+            self._all_clients_connected.is_set()
+            and self._audio_task
+            and not self._audio_task.done()
+        )
 
     def start(self) -> None:
         """Start running (send audio chunks to connected players)."""
         if self.finished:
             raise RuntimeError("Task is already finished")
-        self.allow_start.set()
+        self.allow_start = True
+        if self.expected_players and len(self.subscribed_players) >= len(self.expected_players):
+            self._all_clients_connected.set()
 
     def stop(self) -> None:
         """Stop running this job."""
         if self._audio_task and not self._audio_task.done():
             self._audio_task.cancel()
+        if not self._finished:
+            # we need to make sure that we close the async generator
+            task = asyncio.create_task(self.pcm_audio_source.__anext__())
+            task.cancel()
         self._finished = True
+        for sub_queue in self.subscribed_players.values():
+            empty_queue(sub_queue)
 
     def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str:
         """Resolve the childplayer specific stream URL to this streamjob."""
@@ -167,44 +182,27 @@ class QueueStreamJob:
         self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None
     ) -> AsyncGenerator[bytes, None]:
         """Subscribe consumer and iterate player-specific audio."""
-        ffmpeg_args = get_ffmpeg_args(
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self.subscribe(player_id),
             input_format=self.pcm_format,
             output_format=output_format,
             filter_params=get_player_filter_params(self.mass, player_id),
-            extra_args=[],
-            input_path="-",
-            output_path="-",
-            loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
-        )
-        # launch ffmpeg process with player specific settings
-        # the stream_job_runner will start pushing pcm chunks to the stdin
-        # we then read the players-specific (encoded) output chunks
-        # from ffmpeg stdout and yield them
-        async with AsyncProcess(
-            ffmpeg_args, enable_stdin=True, enable_stdout=True, enable_stderr=False
-        ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc):
-            # read final chunks from ffmpeg's stdout
-            iterator = (
-                ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
-            )
-            async for chunk in iterator:
-                try:
-                    yield chunk
-                except (BrokenPipeError, ConnectionResetError):
-                    # race condition?
-                    break
+            chunk_size=chunk_size,
+        ):
+            yield chunk
 
     async def stream_to_custom_output_path(
-        self, player_id: str, output_format: AudioFormat, output_path: str
+        self, player_id: str, output_format: AudioFormat, output_path: str | int
     ) -> None:
         """Subscribe consumer and instruct ffmpeg to send the audio to the given output path."""
+        custom_file_pointer = isinstance(output_path, int)
         ffmpeg_args = get_ffmpeg_args(
             input_format=self.pcm_format,
             output_format=output_format,
             filter_params=get_player_filter_params(self.mass, player_id),
             extra_args=[],
             input_path="-",
-            output_path=output_path,
+            output_path="-" if custom_file_pointer else output_path,
             loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
         )
         # launch ffmpeg process with player specific settings
@@ -213,72 +211,94 @@ class QueueStreamJob:
         async with AsyncProcess(
             ffmpeg_args,
             enable_stdin=True,
-            enable_stdout=False,
+            enable_stdout=custom_file_pointer,
             enable_stderr=False,
-        ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc):
+            custom_stdin=self.subscribe(player_id),
+            custom_stdout=output_path if custom_file_pointer else None,
+            name="ffmpeg_custom_output_path",
+        ) as ffmpeg_proc:
             # we simply wait for the process to exit
             await ffmpeg_proc.wait()
 
-    @asynccontextmanager
-    async def subscribe(
-        self, player_id: str, ffmpeg_proc: AsyncProcess
-    ) -> AsyncGenerator[QueueStreamJob]:
-        """Subscribe consumer's (output) ffmpeg process."""
-        if self.running:
-            # client subscribes while we're already started
-            # that will probably cause side effects but let it go
-            self.logger.warning(
-                "Player %s is joining while the stream is already started!", player_id
-            )
+    async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
+        """Subscribe consumer and iterate incoming chunks on the queue."""
         try:
-            self._subscribed_players[player_id] = ffmpeg_proc
-            self.logger.debug("Subscribed player %s", player_id)
-            if self.auto_start and len(self._subscribed_players) == len(self.expected_players):
-                self.allow_start.set()
-            yield self
+            self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
+
+            if self._all_clients_connected.is_set():
+                # client subscribes while we're already started
+                self.logger.warning(
+                    "Client %s is joining while the stream is already started", player_id
+                )
+
+            self.logger.debug("Subscribed client %s", player_id)
+
+            if (
+                self.expected_players
+                and self.allow_start
+                and len(self.subscribed_players) == len(self.expected_players)
+            ):
+                # we reached the number of expected subscribers, set event
+                # so that chunks can be pushed
+                self._all_clients_connected.set()
+
+            # keep reading audio chunks from the queue until we receive an empty one
+            while True:
+                chunk = await sub_queue.get()
+                if chunk == b"":
+                    # EOF chunk received
+                    break
+                yield chunk
         finally:
-            self._subscribed_players.pop(player_id, None)
+            self.subscribed_players.pop(player_id, None)
             self.logger.debug("Unsubscribed client %s", player_id)
             # check if this was the last subscriber and we should cancel
-            await asyncio.sleep(5)
-            if len(self._subscribed_players) == 0 and not self.finished:
+            await asyncio.sleep(2)
+            if len(self.subscribed_players) == 0 and self._audio_task and not self.finished:
                 self.logger.debug("Cleaning up, all clients disappeared...")
                 self.stop()
 
+    async def _put_chunk(self, chunk: bytes) -> None:
+        """Put chunk of data to all subscribers."""
+        async with asyncio.TaskGroup() as tg:
+            for sub_queue in list(self.subscribed_players.values()):
+                # put this chunk on the player's subqueue
+                tg.create_task(sub_queue.put(chunk))
+        self.bytes_streamed += len(chunk)
+
     async def _stream_job_runner(self) -> None:
         """Feed audio chunks to StreamJob subscribers."""
-        await self.allow_start.wait()
-        retries = 50
-        while retries:
-            retries -= 1
-            await asyncio.sleep(0.1)
-            if len(self._subscribed_players) != len(self.expected_players):
-                continue
-            await asyncio.sleep(0.2)
-            if len(self._subscribed_players) != len(self.expected_players):
-                continue
-            break
-
-        self.logger.debug(
-            "Starting stream job %s with %s out of %s connected clients",
-            self.job_id,
-            len(self._subscribed_players),
-            len(self.expected_players),
-        )
+        chunk_num = 0
         async for chunk in self.pcm_audio_source:
-            num_subscribers = len(self._subscribed_players)
-            if num_subscribers == 0:
-                break
-            async with asyncio.TaskGroup() as tg:
-                for ffmpeg_proc in list(self._subscribed_players.values()):
-                    tg.create_task(ffmpeg_proc.write(chunk))
+            chunk_num += 1
+            if chunk_num == 1:
+                # wait until all expected clients are connected
+                try:
+                    async with asyncio.timeout(10):
+                        await self._all_clients_connected.wait()
+                except TimeoutError:
+                    if len(self.subscribed_players) == 0:
+                        self.logger.exception(
+                            "Abort multi client stream job  %s: "
+                            "client(s) did not connect within timeout",
+                            self.job_id,
+                        )
+                        break
+                    # not all clients connected but timeout expired, set flag and move on
+                    # with all clients that did connect
+                    self._all_clients_connected.set()
+                else:
+                    self.logger.debug(
+                        "Starting queue stream job %s with %s (out of %s) connected clients",
+                        self.job_id,
+                        len(self.subscribed_players),
+                        len(self.expected_players),
+                    )
 
-        # write EOF at end of queue stream
-        async with asyncio.TaskGroup() as tg:
-            for ffmpeg_proc in list(self._subscribed_players.values()):
-                tg.create_task(ffmpeg_proc.write_eof())
-        self.logger.debug("Finished stream job %s", self.job_id)
-        self._finished = True
+            await self._put_chunk(chunk)
+
+        # mark EOF with empty chunk
+        await self._put_chunk(b"")
 
 
 def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
@@ -585,9 +605,13 @@ class StreamsController(CoreController):
             default_sample_rate=stream_job.pcm_format.sample_rate,
             default_bit_depth=stream_job.pcm_format.bit_depth,
         )
-        # prepare request, add some DLNA/UPNP compatible headers
-        enable_icy = request.headers.get("Icy-MetaData", "") == "1"
+        # play it safe: only allow icy metadata for mp3 and aac
+        enable_icy = request.headers.get(
+            "Icy-MetaData", ""
+        ) == "1" and output_format.content_type in (ContentType.MP3, ContentType.AAC)
         icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384
+
+        # prepare request, add some DLNA/UPNP compatible headers
         headers = {
             **DEFAULT_STREAM_HEADERS,
             "Content-Type": f"audio/{output_format.output_format_str}",
@@ -606,6 +630,19 @@ class StreamsController(CoreController):
         if request.method != "GET":
             return resp
 
+        # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
+        # to the stream in an attempt to get the audio details such as duration
+        # which is a bit pointless for our duration-less queue stream
+        # and it completely messes with the subscription logic
+        if player_id in stream_job.subscribed_players:
+            self.logger.warning(
+                "Player %s is making multiple requests "
+                "to the same stream, playback may be disturbed!",
+                player_id,
+            )
+        elif "rincon" in player_id.lower():
+            await asyncio.sleep(0.1)
+
         # all checks passed, start streaming!
         self.logger.debug(
             "Start serving Queue flow audio stream for queue %s to player %s",
@@ -912,7 +949,6 @@ class StreamsController(CoreController):
             if content_type == ContentType.PCM:
                 # resolve generic pcm type
                 content_type = ContentType.from_bit_depth(output_bit_depth)
-
         else:
             output_sample_rate = min(default_sample_rate, queue_player.max_sample_rate)
             player_max_bit_depth = 24 if queue_player.supports_24bit else 16
index 321b97552622f4ff4218c2440b8e96323d81853f..720d3ac54dfd94269a4953f17ca4e1e209a4c8b8 100644 (file)
@@ -335,7 +335,6 @@ async def get_media_stream(  # noqa: PLR0915
     Other than stripping silence at end and beginning and optional
     volume normalization this is the pure, unaltered audio data as PCM chunks.
     """
-    logger = LOGGER.getChild("media_stream")
     bytes_sent = 0
     is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
     if is_radio or streamdetails.seek_position:
@@ -377,27 +376,28 @@ async def get_media_stream(  # noqa: PLR0915
     )
 
     finished = False
-    logger.debug("start media stream for: %s", streamdetails.uri)
 
-    writer_task: asyncio.Task | None = None
     ffmpeg_proc = AsyncProcess(
-        ffmpeg_args, enable_stdin=streamdetails.direct is None, enable_stderr=True
+        ffmpeg_args,
+        enable_stdin=streamdetails.direct is None,
+        enable_stderr=True,
+        name="ffmpeg_media_stream",
     )
     await ffmpeg_proc.start()
+    logger = LOGGER.getChild("media_stream")
+    logger.debug("start media stream for: %s", streamdetails.uri)
 
     async def writer() -> None:
         """Task that grabs the source audio and feeds it to ffmpeg."""
-        logger.log(VERBOSE_LOG_LEVEL, "writer started for %s", streamdetails.uri)
         music_prov = mass.get_provider(streamdetails.provider)
         seek_pos = streamdetails.seek_position if streamdetails.can_seek else 0
         async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
             await ffmpeg_proc.write(audio_chunk)
         # write eof when last packet is received
         await ffmpeg_proc.write_eof()
-        logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri)
 
     if streamdetails.direct is None:
-        writer_task = asyncio.create_task(writer())
+        ffmpeg_proc.attached_tasks.append(asyncio.create_task(writer()))
 
     # get pcm chunks from stdout
     # we always stay one chunk behind to properly detect end of chunks
@@ -464,8 +464,7 @@ async def get_media_stream(  # noqa: PLR0915
                 streamdetails.uri,
                 seconds_streamed,
             )
-        if writer_task and not writer_task.done():
-            writer_task.cancel()
+
         # use communicate to read stderr and wait for exit
         # read log for loudness measurement (or errors)
         _, stderr = await ffmpeg_proc.communicate()
@@ -674,7 +673,6 @@ async def get_ffmpeg_stream(
     Takes care of resampling and/or recoding if needed,
     according to player preferences.
     """
-    logger = LOGGER.getChild("ffmpeg_stream")
     use_stdin = not isinstance(audio_input, str)
     ffmpeg_args = get_ffmpeg_args(
         input_format=input_format,
@@ -683,44 +681,20 @@ async def get_ffmpeg_stream(
         extra_args=extra_args or [],
         input_path="-" if use_stdin else audio_input,
         output_path="-",
+        loglevel="info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
     )
-
-    writer_task: asyncio.Task | None = None
-    ffmpeg_proc = AsyncProcess(
-        ffmpeg_args, enable_stdin=use_stdin, enable_stdout=True, enable_stderr=True
-    )
-    await ffmpeg_proc.start()
-
-    # feed stdin with pcm audio chunks from origin
-    async def writer() -> None:
-        async for chunk in audio_input:
-            if ffmpeg_proc.closed:
-                return
-            await ffmpeg_proc.write(chunk)
-        await ffmpeg_proc.write_eof()
-
-    try:
-        if not isinstance(audio_input, str):
-            writer_task = asyncio.create_task(writer())
-
+    async with AsyncProcess(
+        ffmpeg_args,
+        enable_stdin=use_stdin,
+        enable_stdout=True,
+        enable_stderr=False,
+        custom_stdin=audio_input if use_stdin else None,
+        name="player_ffmpeg_stream",
+    ) as ffmpeg_proc:
         # read final chunks from stdout
         chunk_size = chunk_size or get_chunksize(output_format, 1)
         async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
-            try:
-                yield chunk
-            except (BrokenPipeError, ConnectionResetError):
-                # race condition
-                break
-    finally:
-        if writer_task and not writer_task.done():
-            writer_task.cancel()
-        # use communicate to read stderr and wait for exit
-        _, stderr = await ffmpeg_proc.communicate()
-        if ffmpeg_proc.returncode != 0:
-            # ffmpeg has a non zero returncode meaning trouble
-            logger.warning("FFMPEG ERROR\n%s", stderr.decode())
-        else:
-            logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
+            yield chunk
 
 
 async def check_audio_support() -> tuple[bool, bool, str]:
index c3a27314bffc258f6b26dfa1b0deb36672dd4b4b..69845357455e8e96c2b8a7546b082c434658febe 100644 (file)
@@ -6,7 +6,7 @@ import datetime
 from typing import TYPE_CHECKING
 
 from music_assistant.common.models.enums import MediaType
-from music_assistant.constants import MASS_LOGO_ONLINE
+from music_assistant.constants import MASS_LOGO_ONLINE, UGP_PREFIX
 
 if TYPE_CHECKING:
     from music_assistant.common.models.queue_item import QueueItem
@@ -15,28 +15,27 @@ if TYPE_CHECKING:
 # ruff: noqa: E501
 
 
-def create_didl_metadata(
-    mass: MusicAssistant, url: str, queue_item: QueueItem | None = None
-) -> str:
+def create_didl_metadata(mass: MusicAssistant, url: str, queue_item: QueueItem) -> str:
     """Create DIDL metadata string from url and (optional) QueueItem."""
     ext = url.split(".")[-1].split("?")[0]
-    if queue_item is None:
+    if "flow" in url or queue_item.queue_id.startswith(UGP_PREFIX):
+        # flow stream
         return (
             '<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
             f'<item id="flowmode" parentID="0" restricted="1">'
-            "<dc:title>Music Assistant</dc:title>"
+            f"<dc:title>Music Assistant</dc:title>"
             f"<upnp:albumArtURI>{escape_string(MASS_LOGO_ONLINE)}</upnp:albumArtURI>"
+            f"<dc:queueItemId>{queue_item.queue_id}</dc:queueItemId>"
             "<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
             f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
             f'<res duration="23:59:59.000" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(url)}</res>'
             "</item>"
             "</DIDL-Lite>"
         )
-    is_radio = queue_item.media_type != MediaType.TRACK or not queue_item.duration
     image_url = (
         mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE
     )
-    if is_radio:
+    if queue_item.media_type != MediaType.TRACK or not queue_item.duration:
         # radio or other non-track item
         return (
             '<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
index db40ac748d23df48a4d7a34731c0571c271a961c..d4cbc312afcffbd66f2f5c19e1ecb6d3e5eb1ba2 100644 (file)
@@ -40,6 +40,9 @@ class AsyncProcess:
         enable_stdin: bool = False,
         enable_stdout: bool = True,
         enable_stderr: bool = False,
+        custom_stdin: AsyncGenerator[bytes, None] | int | None = None,
+        custom_stdout: int | None = None,
+        name: str | None = None,
     ) -> None:
         """Initialize AsyncProcess."""
         self.proc: asyncio.subprocess.Process | None = None
@@ -48,10 +51,14 @@ class AsyncProcess:
         self._enable_stdout = enable_stdout
         self._enable_stderr = enable_stderr
         self._close_called = False
-        self._stdin_lock = asyncio.Lock()
-        self._stdout_lock = asyncio.Lock()
-        self._stderr_lock = asyncio.Lock()
         self._returncode: bool | None = None
+        self._name = name or self._args[0].split(os.sep)[-1]
+        self.attached_tasks: list[asyncio.Task] = []
+        self._custom_stdin = custom_stdin
+        if not isinstance(custom_stdin, int | None):
+            self._custom_stdin = None
+            self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin)))
+        self._custom_stdout = custom_stdout
 
     @property
     def closed(self) -> bool:
@@ -81,22 +88,20 @@ class AsyncProcess:
         """Exit context manager."""
         await self.close()
         self._returncode = self.returncode
-        del self.proc
-        del self._stdin_lock
-        del self._stdout_lock
-        del self._returncode
 
     async def start(self) -> None:
         """Perform Async init of process."""
+        stdin = self._custom_stdin if self._custom_stdin is not None else asyncio.subprocess.PIPE
+        stdout = self._custom_stdout if self._custom_stdout is not None else asyncio.subprocess.PIPE
         self.proc = await asyncio.create_subprocess_exec(
             *self._args,
-            stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
-            stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
+            stdin=stdin if self._enable_stdin else None,
+            stdout=stdout if self._enable_stdout else None,
             stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
-            close_fds=True,
+            limit=4000000,
+            pipesize=256000,
         )
-        proc_name_simple = self._args[0].split(os.sep)[-1]
-        LOGGER.debug("Started %s with PID %s", proc_name_simple, self.proc.pid)
+        LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid)
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
@@ -116,11 +121,8 @@ class AsyncProcess:
 
     async def readexactly(self, n: int) -> bytes:
         """Read exactly n bytes from the process stdout (or less if eof)."""
-        if self._close_called or self.proc.stdout.at_eof():
-            return b""
         try:
-            async with self._stdout_lock:
-                return await self.proc.stdout.readexactly(n)
+            return await self.proc.stdout.readexactly(n)
         except asyncio.IncompleteReadError as err:
             return err.partial
 
@@ -131,36 +133,21 @@ class AsyncProcess:
         and may return less or equal bytes than requested, but at least one byte.
         If EOF was received before any byte is read, this function returns empty byte object.
         """
-        if self._close_called or self.proc.stdout.at_eof():
-            return b""
-        if self.proc.stdout.at_eof():
-            return b""
-        async with self._stdout_lock:
-            return await self.proc.stdout.read(n)
+        return await self.proc.stdout.read(n)
 
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
         if self._close_called or self.proc.stdin.is_closing():
-            return
-        if not self.proc or self.proc.returncode is not None:
-            raise RuntimeError("Process not started or already exited")
-        async with self._stdin_lock:
-            self.proc.stdin.write(data)
-            with suppress(BrokenPipeError):
-                await self.proc.stdin.drain()
+            raise asyncio.CancelledError("write called while process already done")
+        self.proc.stdin.write(data)
+        with suppress(BrokenPipeError, ConnectionResetError):
+            await self.proc.stdin.drain()
 
     async def write_eof(self) -> None:
         """Write end of file to to process stdin."""
-        if not self._enable_stdin:
-            raise RuntimeError("STDIN is not enabled")
-        if not self.proc or self.proc.returncode is not None:
-            raise RuntimeError("Process not started or already exited")
-        if self._close_called or self.proc.stdin.is_closing():
-            return
         try:
-            async with self._stdin_lock:
-                if self.proc.stdin.can_write_eof():
-                    self.proc.stdin.write_eof()
+            if self.proc.stdin.can_write_eof():
+                self.proc.stdin.write_eof()
         except (
             AttributeError,
             AssertionError,
@@ -174,20 +161,31 @@ class AsyncProcess:
     async def close(self) -> int:
         """Close/terminate the process and wait for exit."""
         self._close_called = True
-        if self.proc.returncode is None:
+        # close any/all attached (writer) tasks
+        for task in self.attached_tasks:
+            if not task.done():
+                task.cancel()
+                with suppress(asyncio.CancelledError):
+                    await task
+        # send communicate until we exited
+        while self.proc.returncode is None:
             # make sure the process is cleaned up
             try:
                 # we need to use communicate to ensure buffers are flushed
-                await asyncio.wait_for(self.proc.communicate(), 5)
+                await asyncio.wait_for(self.proc.communicate(), 10)
             except TimeoutError:
                 LOGGER.debug(
-                    "Process with PID %s did not stop within 5 seconds. Sending terminate...",
+                    "Process %s with PID %s did not stop in time. Sending terminate...",
+                    self._name,
                     self.proc.pid,
                 )
                 self.proc.terminate()
-                await self.proc.communicate()
+                await asyncio.sleep(0.5)
         LOGGER.debug(
-            "Process with PID %s stopped with returncode %s", self.proc.pid, self.proc.returncode
+            "Process %s with PID %s stopped with returncode %s",
+            self._name,
+            self.proc.pid,
+            self.proc.returncode,
         )
         return self.proc.returncode
 
@@ -199,19 +197,27 @@ class AsyncProcess:
 
     async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
         """Write bytes to process and read back results."""
-        if self.closed:
-            return (b"", b"")
-        async with self._stdout_lock, self._stdin_lock, self._stderr_lock:
-            stdout, stderr = await self.proc.communicate(input_data)
+        stdout, stderr = await self.proc.communicate(input_data)
         return (stdout, stderr)
 
     async def read_stderr(self) -> AsyncGenerator[bytes, None]:
         """Read lines from the stderr stream."""
-        async with self._stderr_lock:
-            async for line in self.proc.stderr:
-                if self.closed:
-                    break
-                yield line
+        async for line in self.proc.stderr:
+            yield line
+
+    async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None:
+        """Feed stdin with chunks from an AsyncGenerator."""
+        try:
+            async for chunk in custom_stdin:
+                if self._close_called or self.proc.stdin.is_closing():
+                    return
+                await self.write(chunk)
+            await self.write_eof()
+        except asyncio.CancelledError:
+            # make sure the stdin generator is also properly closed
+            # by propagating a cancellederror within
+            task = asyncio.create_task(custom_stdin.__anext__())
+            task.cancel()
 
 
 async def check_output(shell_cmd: str) -> tuple[int, bytes]:
index 3d9a640404c9d595cc307e48c54ba92b388a121f..89e9a5bdc2a58d09d393c7c3e86b0cbe21444621 100644 (file)
@@ -236,7 +236,7 @@ class AirplayStream:
             "-port",
             str(self.airplay_player.discovery_info.port),
             "-wait",
-            str(2000 - sync_adjust),
+            str(3000 - sync_adjust),
             "-volume",
             str(mass_player.volume_level),
             *extra_args,
@@ -252,15 +252,25 @@ class AirplayStream:
         if platform.system() == "Darwin":
             os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
 
+        # connect cliraop stdin with ffmpeg stdout using os pipes
+        read, write = os.pipe()
+        # launch ffmpeg, feeding (player specific) audio chunks on stdout
+        self._audio_reader_task = asyncio.create_task(
+            stream_job.stream_to_custom_output_path(
+                player_id=player_id, output_format=AIRPLAY_PCM_FORMAT, output_path=write
+            )
+        )
         self._cliraop_proc = AsyncProcess(
             cliraop_args,
             enable_stdin=True,
             enable_stdout=False,
             enable_stderr=True,
+            custom_stdin=read,
         )
         await self._cliraop_proc.start()
         self._log_reader_task = asyncio.create_task(self._log_watcher())
-        self._audio_reader_task = asyncio.create_task(self._audio_reader())
+
+        # self._audio_reader_task = asyncio.create_task(self._audio_reader())
 
     async def stop(self, wait: bool = True):
         """Stop playback and cleanup."""
@@ -303,36 +313,48 @@ class AirplayStream:
         """Monitor stderr for the running CLIRaop process."""
         airplay_player = self.airplay_player
         mass_player = self.mass.players.get(airplay_player.player_id)
+        queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
         logger = airplay_player.logger
         lost_packets = 0
+        prev_metadata_checksum: str = ""
+        prev_progress_report: float = 0
         async for line in self._cliraop_proc.read_stderr():
             line = line.decode().strip()  # noqa: PLW2901
             if not line:
                 continue
             if "elapsed milliseconds:" in line:
-                # do not log this line, its too verbose
+                # this is received more or less every second while playing
                 millis = int(line.split("elapsed milliseconds: ")[1])
                 mass_player.elapsed_time = millis / 1000
                 mass_player.elapsed_time_last_updated = time.time()
-                continue
+                # send metadata to player(s) if needed
+                # NOTE: this must all be done in separate tasks to not disturb audio
+                now = time.time()
+                if queue and queue.current_item and queue.current_item.streamdetails:
+                    metadata_checksum = (
+                        queue.current_item.streamdetails.stream_title
+                        or queue.current_item.queue_item_id
+                    )
+                    if prev_metadata_checksum != metadata_checksum:
+                        prev_metadata_checksum = metadata_checksum
+                        prev_progress_report = now
+                        self.mass.create_task(self._send_metadata(queue))
+                    # send the progress report every 5 seconds
+                    elif now - prev_progress_report >= 5:
+                        prev_progress_report = now
+                        self.mass.create_task(self._send_progress(queue))
             if "set pause" in line or "Pause at" in line:
-                logger.debug("raop streaming paused")
                 mass_player.state = PlayerState.PAUSED
                 self.mass.players.update(airplay_player.player_id)
-                continue
             if "Restarted at" in line or "restarting w/ pause" in line:
-                logger.debug("raop streaming restarted after pause")
                 mass_player.state = PlayerState.PLAYING
                 self.mass.players.update(airplay_player.player_id)
-                continue
             if "restarting w/o pause" in line:
                 # streaming has started
-                logger.debug("raop streaming started")
                 mass_player.state = PlayerState.PLAYING
                 mass_player.elapsed_time = 0
                 mass_player.elapsed_time_last_updated = time.time()
                 self.mass.players.update(airplay_player.player_id)
-                continue
             if "lost packet out of backlog" in line:
                 lost_packets += 1
                 if lost_packets == 50:
@@ -341,8 +363,7 @@ class AirplayStream:
                     await self.mass.player_queues.stop(queue.queue_id)
                 else:
                     logger.debug(line)
-                continue
-            # verbose log everything else
+
             logger.log(VERBOSE_LOG_LEVEL, line)
 
         # if we reach this point, the process exited
@@ -350,49 +371,6 @@ class AirplayStream:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
 
-    async def _audio_reader(self) -> None:
-        """Send audio chunks to the cliraop process."""
-        logger = self.airplay_player.logger
-        mass_player = self.mass.players.get(self.airplay_player.player_id, True)
-        queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
-        logger.debug(
-            "Starting RAOP stream for Queue %s to %s",
-            queue.display_name,
-            mass_player.display_name,
-        )
-        prev_metadata_checksum: str = ""
-        prev_progress_report: float = 0
-
-        async for chunk in self.stream_job.iter_player_audio(
-            self.airplay_player.player_id, AIRPLAY_PCM_FORMAT
-        ):
-            if self._stop_requested:
-                return
-            await self._cliraop_proc.write(chunk)
-            # send metadata to player(s) if needed
-            # NOTE: this must all be done in separate tasks to not disturb audio
-            now = time.time()
-            if queue and queue.current_item and queue.current_item.streamdetails:
-                metadata_checksum = (
-                    queue.current_item.streamdetails.stream_title
-                    or queue.current_item.queue_item_id
-                )
-                if prev_metadata_checksum != metadata_checksum:
-                    prev_metadata_checksum = metadata_checksum
-                    prev_progress_report = now
-                    self.mass.create_task(self._send_metadata(queue))
-                # send the progress report every 5 seconds
-                elif now - prev_progress_report >= 5:
-                    prev_progress_report = now
-                    self.mass.create_task(self._send_progress(queue))
-        # send EOF
-        await self._cliraop_proc.write_eof()
-        logger.debug(
-            "Finished RAOP stream for Queue %s to %s",
-            queue.display_name,
-            mass_player.display_name,
-        )
-
     async def _send_metadata(self, queue: PlayerQueue) -> None:
         """Send metadata to player (and connected sync childs)."""
         if not self.running:
index 4f7994a1cef33aa5f7219c5737d0b4ddcd90b86e..2181d78dbb8739fdd3d7808ac6c67bfb4426c772 100644 (file)
@@ -360,9 +360,7 @@ class DLNAPlayerProvider(PlayerProvider):
         # always clear queue (by sending stop) first
         if dlna_player.device.can_stop:
             await self.cmd_stop(player_id)
-        didl_metadata = create_didl_metadata(
-            self.mass, url, queue_item if not use_flow_mode else None
-        )
+        didl_metadata = create_didl_metadata(self.mass, url, queue_item)
         title = queue_item.name if queue_item else "Music Assistant"
         await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
         # Play it
index 0737d52d4aa92813cc6187676e2e9a9471bb8d1a..dc52846d2ae194ec165466f208363868948ef960 100644 (file)
@@ -36,7 +36,6 @@ from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL
 from music_assistant.server.helpers.didl_lite import create_didl_metadata
 from music_assistant.server.models.player_provider import PlayerProvider
-from music_assistant.server.providers.ugp import UGP_PREFIX
 
 from .player import SonosPlayer
 
@@ -352,16 +351,13 @@ class SonosPlayerProvider(PlayerProvider):
             )
             raise PlayerCommandFailed(msg)
 
-        is_flow_stream = queue_item.queue_item_id == "flow" or queue_item.queue_id.startswith(
-            UGP_PREFIX
-        )
         url = self.mass.streams.resolve_stream_url(
-            player_id, queue_item=queue_item, output_codec=ContentType.FLAC
+            player_id,
+            queue_item=queue_item,
+            output_codec=ContentType.FLAC,
         )
         self.mass.create_task(
-            sonos_player.soco.play_uri,
-            url,
-            meta=create_didl_metadata(self.mass, url, None if is_flow_stream else queue_item),
+            sonos_player.soco.play_uri, url, meta=create_didl_metadata(self.mass, url, queue_item)
         )
 
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
index 1f1d2c731b5f0e5084d9bf86b1fc0c31c5f04585..645e3abbd37cd4b4fea5733e7c37c06cca8c6f2f 100644 (file)
@@ -317,7 +317,7 @@ class MusicAssistant:
         if target is None:
             msg = "Target is missing"
             raise RuntimeError(msg)
-        if existing := self._tracked_tasks.get(task_id):
+        if task_id and (existing := self._tracked_tasks.get(task_id)):
             # prevent duplicate tasks if task_id is given and already present
             return existing
         if asyncio.iscoroutinefunction(target):
@@ -328,19 +328,25 @@ class MusicAssistant:
             task = target
         else:
             # assume normal callable (non coroutine or awaitable)
+            # that needs to be run in the executor
             task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs))
 
         def task_done_callback(_task: asyncio.Future | asyncio.Task) -> None:
             _task_id = task.task_id
             self._tracked_tasks.pop(_task_id)
-            # print unhandled exceptions
-            if LOGGER.isEnabledFor(logging.DEBUG) and not _task.cancelled() and _task.exception():
-                task_name = _task.get_name() if hasattr(_task, "get_name") else _task
-                LOGGER.exception(
-                    "Exception in task %s - target: %s",
+            # log unhandled exceptions
+            if (
+                LOGGER.isEnabledFor(logging.DEBUG)
+                and not _task.cancelled()
+                and (err := _task.exception())
+            ):
+                task_name = _task.get_name() if hasattr(_task, "get_name") else str(_task)
+                LOGGER.warning(
+                    "Exception in task %s - target: %s: %s",
                     task_name,
                     str(target),
-                    exc_info=task.exception(),
+                    str(err),
+                    exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
                 )
 
         if task_id is None: