Performance and stability fixes (#1180)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 25 Mar 2024 21:57:52 +0000 (22:57 +0100)
committerGitHub <noreply@github.com>
Mon, 25 Mar 2024 21:57:52 +0000 (22:57 +0100)
12 files changed:
.devcontainer/devcontainer.json
.vscode/launch.json
Dockerfile
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/process.py
music_assistant/server/models/core_controller.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/spotify/__init__.py
music_assistant/server/providers/url/__init__.py
music_assistant/server/server.py

index ea4b78a25ecec7361008f33242e1a4bdb146f2a2..20a027ed981afa6dd4761351538f86b576106784 100644 (file)
@@ -3,10 +3,8 @@
     "dockerfile": "Dockerfile",
     "context": ".."
   },
-  "features": {
-  },
+  "features": {},
   "postCreateCommand": "./.devcontainer/post-create.sh",
-  "forwardPorts": [
-    8095
-  ]
+  "forwardPorts": [8095, 3483, 9000, 9090],
+  "runArgs": ["--network=host"]
 }
index 5a280cfdeb2523a603cf38d2f16d4edd36432adb..7ea91c4b43b9d50d8f0d0dbf440529bd145e1159 100644 (file)
@@ -6,14 +6,14 @@
     "configurations": [
         {
             "name": "Python: Module",
-            "type": "python",
+            "type": "debugpy",
             "request": "launch",
             "module": "music_assistant",
             "justMyCode": false,
             "args":[
                  "--log-level", "debug"
             ],
-            // "env": {"PYTHONDEVMODE": "1"}
+            "env": {"PYTHONDEVMODE": "1"}
         }
     ]
 }
index 7fea4f1919db795d48646b9f9319a9d46a0f36a7..939f01a8598f7765e620f7764f287742f575e865 100644 (file)
@@ -46,6 +46,9 @@ ARG TARGETPLATFORM
 RUN set -x \
     # add bookworm backports repo
     && sh -c 'echo "deb http://deb.debian.org/debian bookworm-backports main" >> /etc/apt/sources.list' \
+    # add multimedia repo
+    && sh -c 'echo "Types: deb\nURIs: https://www.deb-multimedia.org\nSuites: stable\nComponents: main non-free\nSigned-By: /etc/apt/trusted.gpg.d/deb-multimedia-keyring.gpg" >> /etc/apt/sources.list.d/deb-multimedia.sources' \
+    && sh -c 'echo "Package: *\nPin: origin www.deb-multimedia.org\nPin-Priority: 1" >> /etc/apt/preferences.d/99deb-multimedia' \
     && apt-get update \
     && apt-get install -y --no-install-recommends \
         ca-certificates \
@@ -53,7 +56,6 @@ RUN set -x \
         git \
         wget \
         tzdata \
-        ffmpeg \
         libsox-fmt-all \
         libsox3 \
         sox \
@@ -62,6 +64,11 @@ RUN set -x \
         libjemalloc2 \
     # install snapcast server 0.27 from bookworm backports
     && apt-get install -y --no-install-recommends -t bookworm-backports snapserver \
+    # install ffmpeg 6 from multimedia repo
+    && cd /tmp && curl -sLO https://www.deb-multimedia.org/pool/main/d/deb-multimedia-keyring/deb-multimedia-keyring_2016.8.1_all.deb \
+    && apt install -y /tmp/deb-multimedia-keyring_2016.8.1_all.deb \
+    && apt-get update \
+    && apt install -y -t 'o=Unofficial Multimedia Packages' ffmpeg \
     # cleanup
     && rm -rf /tmp/* \
     && rm -rf /var/lib/apt/lists/*
index a1887daf9706b1f68203d207d415aca6afef7383..d713115862f9041c406e40efd131315c53704444 100644 (file)
@@ -37,11 +37,11 @@ from music_assistant.constants import (
     CONF_CROSSFADE_DURATION,
     CONF_OUTPUT_CHANNELS,
     CONF_PUBLISH_IP,
+    ROOT_LOGGER_NAME,
     SILENCE_FILE,
     UGP_PREFIX,
     VERBOSE_LOG_LEVEL,
 )
-from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
@@ -362,7 +362,7 @@ class StreamsController(CoreController):
             "with libsoxr support" if libsoxr_support else "",
         )
         # copy log level to audio module
-        AUDIO_LOGGER.setLevel(self.logger.level)
+        logging.getLogger(f"{ROOT_LOGGER_NAME}.audio").setLevel(self.logger.level)
         # start the webserver
         self.publish_port = config.get_value(CONF_BIND_PORT)
         self.publish_ip = config.get_value(CONF_PUBLISH_IP)
@@ -828,8 +828,9 @@ class StreamsController(CoreController):
                 queue.queue_id, CONF_CROSSFADE_DURATION, 8
             )
             crossfade_size = int(pcm_sample_size * crossfade_duration)
-            buffer_size = int(pcm_sample_size * 5)  # 5 seconds
+            buffer_size = int(pcm_sample_size * 2)  # 2 seconds
             if use_crossfade:
+                # buffer size needs to be big enough to include the crossfade part
                 buffer_size += crossfade_size
             bytes_written = 0
             buffer = b""
@@ -859,9 +860,10 @@ class StreamsController(CoreController):
                         pcm_format.bit_depth,
                         pcm_format.sample_rate,
                     )
-                    # send crossfade_part
-                    yield crossfade_part
+                    # send crossfade_part (as one big chunk)
                     bytes_written += len(crossfade_part)
+                    yield crossfade_part
+
                     # also write the leftover bytes from the crossfade action
                     if remaining_bytes:
                         yield remaining_bytes
@@ -873,9 +875,11 @@ class StreamsController(CoreController):
 
                 #### OTHER: enough data in buffer, feed to output
                 while len(buffer) > buffer_size:
-                    yield buffer[:pcm_sample_size]
-                    bytes_written += pcm_sample_size
+                    subchunk = buffer[:pcm_sample_size]
                     buffer = buffer[pcm_sample_size:]
+                    bytes_written += len(subchunk)
+                    yield subchunk
+                    del subchunk
 
             #### HANDLE END OF TRACK
             if last_fadeout_part:
@@ -891,9 +895,10 @@ class StreamsController(CoreController):
                 bytes_written += len(remaining_bytes)
                 del remaining_bytes
             else:
-                # no crossfade enabled, just yield the (entire) buffer last part
-                yield buffer
+                # no crossfade enabled, just yield the buffer last part
                 bytes_written += len(buffer)
+                yield buffer
+                del buffer
 
             # update duration details based on the actual pcm data we sent
             # this also accounts for crossfade and silence stripping
@@ -914,7 +919,7 @@ class StreamsController(CoreController):
         if last_fadeout_part:
             yield last_fadeout_part
             del last_fadeout_part
-        del buffer
+
         self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
 
     async def get_announcement_stream(
@@ -960,10 +965,10 @@ class StreamsController(CoreController):
         is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
         if is_radio or streamdetails.seek_position:
             strip_silence_begin = False
-        # chunk size = 2 seconds of pcm audio
-        pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
-        chunk_size = pcm_sample_size * (1 if is_radio else 2)
-        expected_chunks = int((streamdetails.duration or 0) / 2)
+        # chunk size = 1 second of pcm audio
+        pcm_sample_size = pcm_format.pcm_sample_size
+        chunk_size = pcm_sample_size  # chunk size = sample size  (= 1 second)
+        expected_chunks = int(((streamdetails.duration or 0) * pcm_sample_size) / chunk_size)
         if expected_chunks < 10:
             strip_silence_end = False
 
@@ -980,7 +985,7 @@ class StreamsController(CoreController):
             extra_args += ["-ss", str(seek_pos)]
         if streamdetails.target_loudness is not None:
             # add loudnorm filters
-            filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
+            filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=11:TP=-2"
             if streamdetails.loudness:
                 filter_rule += f":measured_I={streamdetails.loudness.integrated}"
                 filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
@@ -1013,6 +1018,7 @@ class StreamsController(CoreController):
             input_path=input_path,
             # loglevel info is needed for loudness measurement
             loglevel="info",
+            extra_input_args=["-filter_threads", "1"],
         )
 
         async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]):
@@ -1068,10 +1074,6 @@ class StreamsController(CoreController):
                 if music_prov := self.mass.get_provider(streamdetails.provider):
                     self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
 
-            # cleanup
-            del state_data
-            del ffmpeg_proc
-
         async with AsyncProcess(
             ffmpeg_args,
             enable_stdin=audio_source_iterator is not None,
@@ -1116,8 +1118,7 @@ class StreamsController(CoreController):
 
                 # collect this chunk for next round
                 prev_chunk = chunk
-
-            # we did not receive any data, somethinh wet wrong
+            # if we did not receive any data, something went (terribly) wrong
             # raise here to prevent an endless loop elsewhere
             if state_data["bytes_sent"] == 0:
                 raise AudioError(f"stream error on {streamdetails.uri}")
@@ -1134,7 +1135,11 @@ class StreamsController(CoreController):
             else:
                 final_chunk = prev_chunk
 
-            # yield final chunk to output
+            # yield final chunk to output (in chunk_size parts)
+            while len(final_chunk) > chunk_size:
+                yield final_chunk[:chunk_size]
+                final_chunk = final_chunk[chunk_size:]
+                state_data["bytes_sent"] += len(final_chunk)
             yield final_chunk
             state_data["bytes_sent"] += len(final_chunk)
             state_data["finished"].set()
index 18ff1fd0a73f70d9ae96f134ffaffa27bbe4d2de..a1e1828ccfe627dd18984c03af2c0d1c6db6d349 100644 (file)
@@ -12,7 +12,7 @@ from time import time
 from typing import TYPE_CHECKING
 
 import aiofiles
-from aiohttp import ClientError, ClientResponseError, ClientTimeout
+from aiohttp import ClientResponseError, ClientTimeout
 
 from music_assistant.common.helpers.global_cache import (
     get_global_cache_value,
@@ -380,24 +380,19 @@ async def get_radio_stream(
 ) -> AsyncGenerator[bytes, None]:
     """Get radio audio stream from HTTP, including metadata retrieval."""
     resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url)
-    retries = 0
-    while True:
-        try:
-            retries += 1
-            if is_hls:  # special HLS stream
-                async for chunk in get_hls_stream(mass, resolved_url, streamdetails):
-                    yield chunk
-            elif supports_icy:  # http stream supports icy metadata
-                async for chunk in get_icy_stream(mass, resolved_url, streamdetails):
-                    yield chunk
-            else:  # generic http stream (without icy metadata)
-                async for chunk in get_http_stream(mass, resolved_url, streamdetails):
-                    yield chunk
-        except ClientError:
-            LOGGER.warning("Streaming radio %s failed, retrying...", streamdetails.uri)
-            if retries >= 5:
-                raise
-            await asyncio.sleep(1 * retries)
+    # handle special HLS stream
+    if is_hls:
+        async for chunk in get_hls_stream(mass, resolved_url, streamdetails):
+            yield chunk
+        return
+    # handle http stream supports icy metadata
+    if supports_icy:
+        async for chunk in get_icy_stream(mass, resolved_url, streamdetails):
+            yield chunk
+        return
+    # generic http stream (without icy metadata)
+    async for chunk in get_http_stream(mass, resolved_url, streamdetails):
+        yield chunk
 
 
 async def get_icy_stream(
@@ -495,18 +490,16 @@ async def get_hls_stream(
         "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
     )
 
-    input_format = streamdetails.audio_format
-    output_format = streamdetails.audio_format
     if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
         streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
-        output_format = AudioFormat(content_type=ContentType.FLAC)
 
     try:
         metadata_task = asyncio.create_task(watch_metadata())
         async for chunk in get_ffmpeg_stream(
             audio_input=substream_url,
-            input_format=input_format,
-            output_format=output_format,
+            input_format=streamdetails.audio_format,
+            # we need a self-explaining codec but not loose data from re-encoding
+            output_format=AudioFormat(content_type=ContentType.FLAC),
         ):
             yield chunk
     finally:
@@ -625,7 +618,7 @@ async def get_ffmpeg_stream(
         enable_stdout=True,
         enable_stderr=False,
         custom_stdin=audio_input if use_stdin else None,
-        name="player_ffmpeg_stream",
+        name="ffmpeg_stream",
     ) as ffmpeg_proc:
         # read final chunks from stdout
         chunk_size = chunk_size or get_chunksize(output_format, 1)
@@ -799,6 +792,7 @@ def get_ffmpeg_args(
     input_path: str = "-",
     output_path: str = "-",
     loglevel: str = "info",
+    extra_input_args: list[str] | None = None,
 ) -> list[str]:
     """Collect all args to send to the ffmpeg process."""
     if extra_args is None:
@@ -824,16 +818,13 @@ def get_ffmpeg_args(
         "-ignore_unknown",
         "-protocol_whitelist",
         "file,http,https,tcp,tls,crypto,pipe,data,fd",
+        "-filter_complex_threads",
+        "1",
     ]
     # collect input args
-    input_args = [
-        "-ac",
-        str(input_format.channels),
-        "-channel_layout",
-        "mono" if input_format.channels == 1 else "stereo",
-    ]
-    if input_format.content_type.is_pcm():
-        input_args += ["-ar", str(input_format.sample_rate)]
+    input_args = []
+    if extra_input_args:
+        input_args += extra_input_args
     if input_path.startswith("http"):
         # append reconnect options for direct stream from http
         input_args += [
@@ -852,15 +843,24 @@ def get_ffmpeg_args(
                 "-reconnect_on_http_error",
                 "5xx",
             ]
-    if input_format.content_type != ContentType.UNKNOWN:
-        input_args += ["-f", input_format.content_type.value]
+    if input_format.content_type.is_pcm():
+        input_args += [
+            "-ac",
+            str(input_format.channels),
+            "-channel_layout",
+            "mono" if input_format.channels == 1 else "stereo",
+            "-ar",
+            str(input_format.sample_rate),
+            "-acodec",
+            input_format.content_type.name.lower(),
+            "-f",
+            input_format.content_type.value,
+        ]
     input_args += ["-i", input_path]
 
     # collect output args
     if output_path.upper() == "NULL":
         output_args = ["-f", "null", "-"]
-    elif output_format.content_type == ContentType.UNKNOWN:
-        output_args = [output_path]
     else:
         output_args = [
             "-acodec",
index 299e410de5377d9bf5a03d79c368be1d45c83cdd..080fbe99e6b5c0be99cef52e367b26a759c95295 100644 (file)
@@ -60,7 +60,6 @@ class AsyncProcess:
             self._custom_stdin = None
             self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin)))
         self._custom_stdout = custom_stdout
-        self._stderr_locked = asyncio.Lock()
 
     @property
     def closed(self) -> bool:
@@ -74,7 +73,9 @@ class AsyncProcess:
             return self._returncode
         if self.proc is None:
             return None
-        return self.proc.returncode
+        if (ret_code := self.proc.returncode) is not None:
+            self._returncode = ret_code
+        return ret_code
 
     async def __aenter__(self) -> AsyncProcess:
         """Enter context manager."""
@@ -88,7 +89,8 @@ class AsyncProcess:
         exc_tb: TracebackType | None,
     ) -> bool | None:
         """Exit context manager."""
-        await self.close()
+        # send interrupt signal to process when we're cancelled
+        await self.close(send_signal=exc_type in (GeneratorExit, asyncio.CancelledError))
         self._returncode = self.returncode
 
     async def start(self) -> None:
@@ -105,7 +107,7 @@ class AsyncProcess:
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
-        while True:
+        while self.returncode is None:
             chunk = await self.readexactly(n)
             if len(chunk) == 0:
                 break
@@ -113,7 +115,7 @@ class AsyncProcess:
 
     async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks as they come in from process stdout."""
-        while True:
+        while self.returncode is None:
             chunk = await self.read(n)
             if len(chunk) == 0:
                 break
@@ -121,6 +123,8 @@ class AsyncProcess:
 
     async def readexactly(self, n: int) -> bytes:
         """Read exactly n bytes from the process stdout (or less if eof)."""
+        if not self.proc.stdout or self.proc.stdout.at_eof():
+            return b""
         try:
             return await self.proc.stdout.readexactly(n)
         except asyncio.IncompleteReadError as err:
@@ -133,11 +137,13 @@ class AsyncProcess:
         and may return less or equal bytes than requested, but at least one byte.
         If EOF was received before any byte is read, this function returns empty byte object.
         """
+        if not self.proc.stdout or self.proc.stdout.at_eof():
+            return b""
         return await self.proc.stdout.read(n)
 
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
-        if self._close_called or self.proc.stdin.is_closing():
+        if self.returncode is not None or self.proc.stdin.is_closing():
             raise asyncio.CancelledError("write called while process already done")
         self.proc.stdin.write(data)
         with suppress(BrokenPipeError, ConnectionResetError):
@@ -145,6 +151,8 @@ class AsyncProcess:
 
     async def write_eof(self) -> None:
         """Write end of file to to process stdin."""
+        if self.returncode is not None or self.proc.stdin.is_closing():
+            return
         try:
             if self.proc.stdin.can_write_eof():
                 self.proc.stdin.write_eof()
@@ -158,7 +166,7 @@ class AsyncProcess:
             # already exited, race condition
             pass
 
-    async def close(self) -> int:
+    async def close(self, send_signal: bool = False) -> int:
         """Close/terminate the process and wait for exit."""
         self._close_called = True
         # close any/all attached (writer) tasks
@@ -167,27 +175,26 @@ class AsyncProcess:
                 task.cancel()
                 with suppress(asyncio.CancelledError):
                     await task
-
-        if self.proc.returncode is None:
-            # always first try to send sigint signal to try clean shutdown
-            # for example ffmpeg needs this to cleanly shutdown and not lock on pipes
+        if send_signal and self.returncode is None:
             self.proc.send_signal(SIGINT)
-            # allow the process a little bit of time to respond to the signal
-            await asyncio.sleep(0.1)
+            # allow the process a bit of time to respond to the signal before we go nuclear
+            await asyncio.sleep(0.5)
 
-        # send communicate until we exited
-        while self.proc.returncode is None:
-            # make sure the process is really cleaned up.
-            # especially with pipes this can cause deadlocks if not properly guarded
-            # we need to use communicate to ensure buffers are flushed
-            # we do that with sending communicate
-            if self._enable_stdin and not self.proc.stdin.is_closing():
-                self.proc.stdin.close()
+        # make sure the process is really cleaned up.
+        # especially with pipes this can cause deadlocks if not properly guarded
+        # we need to ensure stdout and stderr are flushed and stdin closed
+        while self.returncode is None:
             try:
-                if self.proc.stdout and self._stderr_locked.locked():
-                    await asyncio.wait_for(self.proc.stdout.read(), 5)
-                else:
-                    await asyncio.wait_for(self.proc.communicate(), 5)
+                async with asyncio.timeout(30):
+                    # abort existing readers on stderr/stdout first before we send communicate
+                    if self.proc.stdout and self.proc.stdout._waiter is not None:
+                        self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
+                        self.proc.stdout._waiter = None
+                    if self.proc.stderr and self.proc.stderr._waiter is not None:
+                        self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
+                        self.proc.stderr._waiter = None
+                    # use communicate to flush all pipe buffers
+                    await self.proc.communicate()
             except TimeoutError:
                 LOGGER.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
@@ -199,27 +206,32 @@ class AsyncProcess:
             "Process %s with PID %s stopped with returncode %s",
             self._name,
             self.proc.pid,
-            self.proc.returncode,
+            self.returncode,
         )
-        return self.proc.returncode
+        return self.returncode
 
     async def wait(self) -> int:
         """Wait for the process and return the returncode."""
         if self.returncode is not None:
             return self.returncode
-        return await self.proc.wait()
+        self._returncode = await self.proc.wait()
+        return self._returncode
 
     async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
         """Write bytes to process and read back results."""
         stdout, stderr = await self.proc.communicate(input_data)
+        self._returncode = self.proc.returncode
         return (stdout, stderr)
 
     async def iter_stderr(self) -> AsyncGenerator[bytes, None]:
         """Iterate lines from the stderr stream."""
-        while not self.closed:
+        while self.returncode is None:
+            if self.proc.stderr.at_eof():
+                break
             try:
-                async with self._stderr_locked:
-                    yield await self.proc.stderr.readline()
+                yield await self.proc.stderr.readline()
+                if self.proc.stderr.at_eof():
+                    break
             except ValueError as err:
                 # we're waiting for a line (separator found), but the line was too big
                 # this may happen with ffmpeg during a long (radio) stream where progress
index 00bf37c3d575bdda16dddc0e55b5289f300417e5..36c24e31182e821ce2c5eb8ae0570de268e10ba3 100644 (file)
@@ -70,6 +70,8 @@ class CoreController:
             )
         if log_level == "GLOBAL":
             self.logger.setLevel(mass_logger.level)
-        elif logging.getLogger().level > self.logger.level:
+        else:
+            self.logger.setLevel(log_level)
+        if logging.getLogger().level > self.logger.level:
             # if the root logger's level is higher, we need to adjust that too
             logging.getLogger().setLevel(self.logger.level)
index d8448fca256bf43c76e9aff4e02b862ac79df9ca..5768c12debb86af666c162b1051de1156016c2fd 100644 (file)
@@ -61,7 +61,8 @@ CONF_ALAC_ENCODE = "alac_encode"
 CONF_VOLUME_START = "volume_start"
 CONF_PASSWORD = "password"
 
-REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 10  # 10 seconds
+# the output buffer to raop must be big enough to prevent small hiccups
+REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 3  # 2 seconds
 
 
 PLAYER_CONFIG_ENTRIES = (
@@ -196,20 +197,12 @@ class AirplayStream:
         self.active_remote_id: str = str(randint(1000, 8000))
         self.start_ntp: int | None = None  # use as checksum
         self.prevent_playback: bool = False
+        self.running = True
         self._log_reader_task: asyncio.Task | None = None
         self._audio_reader_task: asyncio.Task | None = None
         self._cliraop_proc: AsyncProcess | None = None
         self._ffmpeg_proc: AsyncProcess | None = None
-        self._stop_requested = False
-
-    @property
-    def running(self) -> bool:
-        """Return bool if we're running."""
-        return (
-            not self._stop_requested
-            and self._cliraop_proc
-            and self._cliraop_proc.returncode is None
-        )
+        self._buffer = asyncio.Queue(10)
 
     async def start(self, start_ntp: int) -> None:
         """Initialize CLIRaop process for a player."""
@@ -241,7 +234,7 @@ class AirplayStream:
             "-port",
             str(self.airplay_player.discovery_info.port),
             "-wait",
-            str(2000 - sync_adjust),
+            str(2500 - sync_adjust),
             "-volume",
             str(mass_player.volume_level),
             *extra_args,
@@ -261,6 +254,16 @@ class AirplayStream:
         # one could argue that the intermediate ffmpeg towards cliraop is not needed
         # when there are no player specific filters or extras but in this case
         # ffmpeg serves as a small buffer towards the realtime cliraop streamer
+        read, write = os.pipe()
+
+        async def read_from_buffer() -> AsyncGenerator[bytes, None]:
+            while True:
+                next_chunk = await self._buffer.get()
+                if not next_chunk:
+                    break
+                yield next_chunk
+                del next_chunk
+
         ffmpeg_args = get_ffmpeg_args(
             input_format=self.input_format,
             output_format=AIRPLAY_PCM_FORMAT,
@@ -272,35 +275,40 @@ class AirplayStream:
             enable_stdin=True,
             enable_stdout=True,
             enable_stderr=False,
+            custom_stdin=read_from_buffer(),
+            custom_stdout=write,
             name="cliraop_ffmpeg",
         )
         await self._ffmpeg_proc.start()
+        os.close(write)
+
         self._cliraop_proc = AsyncProcess(
             cliraop_args,
             enable_stdin=True,
             enable_stdout=False,
             enable_stderr=True,
-            custom_stdin=self._audio_feeder(),
+            custom_stdin=read,
             name="cliraop",
         )
         await self._cliraop_proc.start()
+        os.close(read)
         self._log_reader_task = asyncio.create_task(self._log_watcher())
 
     async def stop(self, wait: bool = True):
         """Stop playback and cleanup."""
-        if self._cliraop_proc.closed and self._ffmpeg_proc.closed:
-            return
-        self._stop_requested = True
+        self.running = False
 
         async def _stop() -> None:
             # ffmpeg MUST be stopped before cliraop due to the chained pipes
-            await self._ffmpeg_proc.close()
+            if not self._ffmpeg_proc.closed:
+                await self._ffmpeg_proc.close(True)
             # allow the cliraop process to stop gracefully first
-            await self.send_cli_command("ACTION=STOP")
-            with suppress(TimeoutError):
-                await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+            if not self._cliraop_proc.closed:
+                await self.send_cli_command("ACTION=STOP")
+                with suppress(TimeoutError):
+                    await asyncio.wait_for(self._cliraop_proc.wait(), 5)
             # send regular close anyway (which also logs the returncode)
-            await self._cliraop_proc.close()
+            await self._cliraop_proc.close(True)
 
         task = self.mass.create_task(_stop())
         if wait:
@@ -308,13 +316,13 @@ class AirplayStream:
 
     async def write_chunk(self, chunk: bytes) -> None:
         """Write a (pcm) audio chunk to ffmpeg."""
-        await self._ffmpeg_proc.write(chunk)
+        await self._buffer.put(chunk)
 
     async def write_eof(self) -> None:
         """Write EOF to the ffmpeg stdin."""
-        await self._ffmpeg_proc.write_eof()
-        await self._ffmpeg_proc.wait()
-        await self.stop()
+        if not self.running:
+            return
+        await self._buffer.put(b"")
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
@@ -380,7 +388,7 @@ class AirplayStream:
                 self.mass.players.update(airplay_player.player_id)
             if "lost packet out of backlog" in line:
                 lost_packets += 1
-                if lost_packets == 100:
+                if lost_packets == 50:
                     logger.error("High packet loss detected, stopping playback...")
                     await self.stop(False)
                 elif lost_packets % 10 == 0:
@@ -389,16 +397,16 @@ class AirplayStream:
             logger.log(VERBOSE_LOG_LEVEL, line)
 
         # if we reach this point, the process exited
+        self.running = False
         if airplay_player.active_stream == self:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
         # ensure we're cleaned up afterwards
-        await self.stop()
+        if self._ffmpeg_proc.returncode is None or self._cliraop_proc.returncode is None:
+            await self.stop()
 
     async def _send_metadata(self, queue: PlayerQueue) -> None:
         """Send metadata to player (and connected sync childs)."""
-        if not self.running:
-            return
         if not queue or not queue.current_item:
             return
         duration = min(queue.current_item.duration or 0, 3600)
@@ -437,31 +445,11 @@ class AirplayStream:
 
     async def _send_progress(self, queue: PlayerQueue) -> None:
         """Send progress report to player (and connected sync childs)."""
-        if not self.running:
-            return
         if not queue or not queue.current_item:
             return
         progress = int(queue.corrected_elapsed_time)
         await self.send_cli_command(f"PROGRESS={progress}\n")
 
-    async def _audio_feeder(self) -> AsyncGenerator[bytes, None]:
-        """Read chunks from ffmpeg and feed (buffered) to cliraop."""
-        buffer = b""
-        async for chunk in self._ffmpeg_proc.iter_any():
-            if self._stop_requested:
-                break
-            buffer += chunk
-            chunksize = len(chunk)
-            del chunk
-            while len(buffer) > REQUIRED_BUFFER:
-                yield buffer[:chunksize]
-                buffer = buffer[chunksize:]
-        # end of stream
-        if not self._stop_requested:
-            yield buffer
-            await self._cliraop_proc.write_eof()
-        del buffer
-
 
 @dataclass
 class AirPlayPlayer:
@@ -666,18 +654,17 @@ class AirplayProvider(PlayerProvider):
         start_ntp = int(stdout.strip())
 
         # setup Raop process for player and its sync childs
-        async with asyncio.TaskGroup() as tg:
-            for airplay_player in self._get_sync_clients(player_id):
-                airplay_player.active_stream = AirplayStream(
-                    self, airplay_player, input_format=input_format
-                )
-                tg.create_task(airplay_player.active_stream.start(start_ntp))
+        for airplay_player in self._get_sync_clients(player_id):
+            airplay_player.active_stream = AirplayStream(
+                self, airplay_player, input_format=input_format
+            )
+            self.mass.create_task(airplay_player.active_stream.start(start_ntp))
 
         async for chunk in audio_source:
             active_clients = 0
             async with asyncio.TaskGroup() as tg:
                 for airplay_player in self._get_sync_clients(player_id):
-                    if not (airplay_player.active_stream and airplay_player.active_stream.running):
+                    if not airplay_player.active_stream or not airplay_player.active_stream.running:
                         # player stopped or switched to a new stream
                         continue
                     if airplay_player.active_stream.start_ntp != start_ntp:
@@ -689,14 +676,9 @@ class AirplayProvider(PlayerProvider):
             if active_clients == 0:
                 # no more clients
                 return
-        # entire stream consumed: send EOF
+        # entire stream consumed: send EOF (empty chunk)
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                if (
-                    not airplay_player.active_stream
-                    or airplay_player.active_stream.start_ntp != start_ntp
-                ):
-                    continue
                 tg.create_task(airplay_player.active_stream.write_eof())
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
index 0a903ca44c4a9d9b1cb9e4c93f2c0d160dd17d99..6698b803aeb1f29024bb9ad667b4277799173143 100644 (file)
@@ -330,7 +330,7 @@ class SnapCastProvider(PlayerProvider):
             input_format = DEFAULT_SNAPCAST_FORMAT
             audio_source = self.mass.streams.get_announcement_stream(
                 queue_item.streamdetails.data["url"],
-                pcm_format=DEFAULT_SNAPCAST_FORMAT,
+                output_format=DEFAULT_SNAPCAST_FORMAT,
                 use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
             )
         else:
index 23fe6b3bf517da2fc58b9fddff1d91fa10dcc5d8..c9560b407c2f73528bf0a34acff4936616a1abbc 100644 (file)
@@ -637,10 +637,10 @@ class SpotifyProvider(MusicProvider):
             try:
                 retries += 1
                 if not tokeninfo:
-                    async with asyncio.timeout(5):
+                    async with asyncio.timeout(10):
                         tokeninfo = await self._get_token()
                 if tokeninfo and not userinfo:
-                    async with asyncio.timeout(5):
+                    async with asyncio.timeout(10):
                         userinfo = await self._get_data("me", tokeninfo=tokeninfo)
                 if tokeninfo and userinfo:
                     # we have all info we need!
@@ -689,10 +689,8 @@ class SpotifyProvider(MusicProvider):
         ]
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
-        librespot = await asyncio.create_subprocess_exec(
-            *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
-        )
-        stdout, _ = await librespot.communicate()
+        async with AsyncProcess(args, enable_stdout=True) as librespot:
+            stdout = await librespot.read(-1)
         if stdout.decode().strip() != "authorized":
             raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}")
         # get token with (authorized) librespot
@@ -727,10 +725,8 @@ class SpotifyProvider(MusicProvider):
         ]
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
-        librespot = await asyncio.create_subprocess_exec(
-            *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
-        )
-        stdout, _ = await librespot.communicate()
+        async with AsyncProcess(args, enable_stdout=True) as librespot:
+            stdout = await librespot.read(-1)
         duration = round(time.time() - time_start, 2)
         try:
             result = json.loads(stdout)
index c3ec5dd72ccba15bf389616406f6f38b995b4879..bffb8cbcaefab6af84db16c4d53b6a22f2973b17 100644 (file)
@@ -152,14 +152,14 @@ class URLProvider(MusicProvider):
             media_item = Radio(
                 item_id=url,
                 provider=self.domain,
-                name=media_info.get("icy-name") or media_info.title,
+                name=media_info.get("icy-name") or url,
                 provider_mappings=provider_mappings,
             )
         else:
             media_item = Track(
                 item_id=url,
                 provider=self.domain,
-                name=media_info.title,
+                name=media_info.title or url,
                 duration=int(media_info.duration or 0),
                 artists=[await self.get_artist(artist) for artist in media_info.artists],
                 provider_mappings=provider_mappings,
index 78398cf9bdeddb43778d8fc0d2d302298b86e976..e4ff1a3545643edcb62b8d0a5b3c8bd7a9bf767b 100644 (file)
@@ -262,9 +262,9 @@ class MusicAssistant:
         if self.closing:
             return
 
-        if LOGGER.isEnabledFor(logging.DEBUG) and event != EventType.QUEUE_TIME_UPDATED:
+        if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
             # do not log queue time updated events because that is too chatty
-            LOGGER.getChild("event").debug("%s %s", event.value, object_id or "")
+            LOGGER.getChild("event").log(VERBOSE_LOG_LEVEL, "%s %s", event.value, object_id or "")
 
         event_obj = MassEvent(event=event, object_id=object_id, data=data)
         for cb_func, event_filter, id_filter in self._subscribers:
@@ -362,13 +362,13 @@ class MusicAssistant:
         *args: Any,
         task_id: str | None = None,
         **kwargs: Any,
-    ) -> asyncio.Task | asyncio.Future:
+    ) -> asyncio.TimerHandle:
         """Run callable/awaitable after given delay."""
 
         def _create_task() -> None:
             self.create_task(target, *args, task_id=task_id, **kwargs)
 
-        self.loop.call_later(delay, _create_task)
+        return self.loop.call_later(delay, _create_task)
 
     def get_task(self, task_id: str) -> asyncio.Task | asyncio.Future:
         """Get existing scheduled task."""