More fixes for playback and better logging (#1189)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 29 Mar 2024 01:48:11 +0000 (02:48 +0100)
committerGitHub <noreply@github.com>
Fri, 29 Mar 2024 01:48:11 +0000 (02:48 +0100)
More fixes for playback

music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/process.py
music_assistant/server/helpers/tags.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/filesystem_local/__init__.py
music_assistant/server/providers/filesystem_smb/__init__.py
music_assistant/server/providers/radiobrowser/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/spotify/__init__.py
music_assistant/server/providers/tunein/__init__.py

index cdbce83f51153a09aa469efe9da5c999a0e28081..826d5fd1ed8e2de70d8688be4c267f677bbe81e3 100644 (file)
@@ -10,6 +10,7 @@ from __future__ import annotations
 
 import asyncio
 import logging
+import os
 import time
 import urllib.parse
 from collections.abc import AsyncGenerator
@@ -17,6 +18,7 @@ from contextlib import suppress
 from typing import TYPE_CHECKING, Any
 
 import shortuuid
+from aiofiles.os import wrap
 from aiohttp import web
 
 from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
@@ -78,6 +80,8 @@ FLOW_DEFAULT_BIT_DEPTH = 24
 
 # pylint:disable=too-many-locals
 
+isfile = wrap(os.path.isfile)
+
 
 class MultiClientStreamJob:
     """
@@ -999,10 +1003,6 @@ class StreamsController(CoreController):
         if streamdetails.fade_in:
             filter_params.append("afade=type=in:start_time=0:duration=3")
 
-        audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
-            streamdetails,
-            seek_position=streamdetails.seek_position,
-        )
         ffmpeg_args = get_ffmpeg_args(
             input_format=streamdetails.audio_format,
             output_format=pcm_format,
@@ -1081,12 +1081,12 @@ class StreamsController(CoreController):
             # cleanup
             del stderr_data
 
+        audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+            streamdetails,
+            seek_position=streamdetails.seek_position,
+        )
         async with AsyncProcess(
-            ffmpeg_args,
-            enable_stdin=True,
-            enable_stderr=True,
-            custom_stdin=audio_source,
-            name="ffmpeg_media_stream",
+            ffmpeg_args, stdin=audio_source, stdout=True, stderr=True, name="ffmpeg_media_stream"
         ) as ffmpeg_proc:
             state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
             logger.debug("start media stream for: %s", streamdetails.uri)
index 177082bd65e756afdf5e99596d50eea090eeeb83..9ac7fe82a1fd5dd8ccbe8fb1e77c696197f605de 100644 (file)
@@ -113,7 +113,7 @@ async def crossfade_pcm_parts(
         fmt,
         "-",
     ]
-    async with AsyncProcess(args, True) as proc:
+    async with AsyncProcess(args, stdin=True, stdout=True) as proc:
         crossfade_data, _ = await proc.communicate(fade_in_part)
         if crossfade_data:
             LOGGER.log(
@@ -170,7 +170,7 @@ async def strip_silence(
         ]
     # output args
     args += ["-f", fmt, "-"]
-    async with AsyncProcess(args, True) as proc:
+    async with AsyncProcess(args, stdin=True, stdout=True) as proc:
         stripped_data, _ = await proc.communicate(audio_data)
 
     # return stripped audio
@@ -597,7 +597,36 @@ async def get_file_stream(
     if not streamdetails.size:
         stat = await asyncio.to_thread(os.stat, filename)
         streamdetails.size = stat.st_size
-    chunk_size = get_chunksize(streamdetails.audio_format.content_type)
+
+    # seeking an unknown or container format is not supported due to the (moov) headers
+    if seek_position and (
+        streamdetails.audio_format.content_type
+        in (
+            ContentType.UNKNOWN,
+            ContentType.M4A,
+            ContentType.M4B,
+            ContentType.MP4,
+        )
+    ):
+        LOGGER.debug(
+            "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.",
+            streamdetails.uri,
+            streamdetails.audio_format.output_format_str,
+        )
+        async for chunk in get_ffmpeg_stream(
+            filename,
+            # we must set the input content type to unknown to
+            # enforce ffmpeg to determine it from the headers
+            input_format=AudioFormat(content_type=ContentType.UNKNOWN),
+            # enforce wav as we dont want to re-encode lossy formats
+            # choose wav so we have descriptive headers and move on
+            output_format=AudioFormat(content_type=ContentType.WAV),
+            extra_input_args=["-ss", str(seek_position)],
+        ):
+            yield chunk
+        return
+
+    chunk_size = get_chunksize(streamdetails.audio_format)
     async with aiofiles.open(streamdetails.data, "rb") as _file:
         if seek_position:
             seek_pos = int((streamdetails.size / streamdetails.duration) * seek_position)
@@ -617,8 +646,9 @@ async def get_ffmpeg_stream(
     filter_params: list[str] | None = None,
     extra_args: list[str] | None = None,
     chunk_size: int | None = None,
-    loglevel: str | None = None,
+    ffmpeg_loglevel: str = "info",
     extra_input_args: list[str] | None = None,
+    logger: logging.Logger | None = None,
 ) -> AsyncGenerator[bytes, None]:
     """
     Get the ffmpeg audio stream as async generator.
@@ -626,25 +656,22 @@ async def get_ffmpeg_stream(
     Takes care of resampling and/or recoding if needed,
     according to player preferences.
     """
-    if loglevel is None:
-        loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "fatal"
-    use_stdin = not isinstance(audio_input, str)
     ffmpeg_args = get_ffmpeg_args(
         input_format=input_format,
         output_format=output_format,
         filter_params=filter_params or [],
         extra_args=extra_args or [],
-        input_path="-" if use_stdin else audio_input,
+        input_path=audio_input if isinstance(audio_input, str) else "-",
         output_path="-",
-        loglevel=loglevel,
+        loglevel=ffmpeg_loglevel,
         extra_input_args=extra_input_args or [],
     )
+    stdin = audio_input if not isinstance(audio_input, str) else True
     async with AsyncProcess(
         ffmpeg_args,
-        enable_stdin=use_stdin,
-        enable_stdout=True,
-        enable_stderr=False,
-        custom_stdin=audio_input if use_stdin else None,
+        stdin=stdin,
+        stdout=True,
+        stderr=logger or LOGGER.getChild("ffmpeg_stream"),
         name="ffmpeg_stream",
     ) as ffmpeg_proc:
         # read final chunks from stdout
@@ -722,7 +749,7 @@ async def get_silence(
         output_format.output_format_str,
         "-",
     ]
-    async with AsyncProcess(args) as ffmpeg_proc:
+    async with AsyncProcess(args, stdout=True) as ffmpeg_proc:
         async for chunk in ffmpeg_proc.iter_any():
             yield chunk
 
@@ -782,7 +809,7 @@ def get_ffmpeg_args(
     extra_args: list[str] | None = None,
     input_path: str = "-",
     output_path: str = "-",
-    loglevel: str = "fatal",
+    loglevel: str = "info",
     extra_input_args: list[str] | None = None,
 ) -> list[str]:
     """Collect all args to send to the ffmpeg process."""
@@ -847,7 +874,12 @@ def get_ffmpeg_args(
             "-i",
             input_path,
         ]
-    elif input_format.content_type == ContentType.UNKNOWN:
+    elif input_format.content_type in (
+        ContentType.UNKNOWN,
+        ContentType.M4A,
+        ContentType.M4B,
+        ContentType.MP4,
+    ):
         # let ffmpeg guess/auto detect the content type
         input_args += ["-i", input_path]
     else:
index 6ed4539ae4685454742542885bc55250557652fb..532de822a647cda0f10de3f63af4284b402e0d29 100644 (file)
@@ -11,15 +11,17 @@ from __future__ import annotations
 import asyncio
 import logging
 import os
+
+# if TYPE_CHECKING:
+from collections.abc import AsyncGenerator
 from contextlib import suppress
 from signal import SIGINT
 from types import TracebackType
 from typing import TYPE_CHECKING
 
-if TYPE_CHECKING:
-    from collections.abc import AsyncGenerator
+from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
 
-LOGGER = logging.getLogger(__name__)
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
 
 DEFAULT_CHUNKSIZE = 128000
 
@@ -38,28 +40,31 @@ class AsyncProcess:
     def __init__(
         self,
         args: list[str],
-        enable_stdin: bool = False,
-        enable_stdout: bool = True,
-        enable_stderr: bool = False,
-        custom_stdin: AsyncGenerator[bytes, None] | int | None = None,
-        custom_stdout: int | None = None,
+        stdin: bool | int | AsyncGenerator[bytes, None] | None = None,
+        stdout: bool | int | None = None,
+        stderr: bool | int | logging.Logger | None = None,
         name: str | None = None,
     ) -> None:
         """Initialize AsyncProcess."""
         self.proc: asyncio.subprocess.Process | None = None
+        if isinstance(stderr, logging.Logger):
+            self._stderr_logger = stderr
+            stderr = asyncio.subprocess.PIPE
+        else:
+            self._stderr_logger = None
         self._args = args
-        self._enable_stdin = enable_stdin
-        self._enable_stdout = enable_stdout
-        self._enable_stderr = enable_stderr
+        self._stdin = stdin
+        self._stdout = stdout
+        self._stderr = stderr
+        self._stdin_enabled = stdin not in (None, False)
+        self._stdout_enabled = stdout not in (None, False)
+        self._stderr_enabled = stderr not in (None, False)
         self._close_called = False
         self._returncode: bool | None = None
-        self._name = name or self._args[0].split(os.sep)[-1]
+        if name is None:
+            name = self._args[0].split(os.sep)[-1]
+        self.name = name
         self.attached_tasks: list[asyncio.Task] = []
-        self._custom_stdin = custom_stdin
-        if not isinstance(custom_stdin, int | None):
-            self._custom_stdin = None
-            self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin)))
-        self._custom_stdout = custom_stdout
 
     @property
     def closed(self) -> bool:
@@ -95,15 +100,29 @@ class AsyncProcess:
 
     async def start(self) -> None:
         """Perform Async init of process."""
-        stdin = self._custom_stdin if self._custom_stdin is not None else asyncio.subprocess.PIPE
-        stdout = self._custom_stdout if self._custom_stdout is not None else asyncio.subprocess.PIPE
+        if self._stdin is True or isinstance(self._stdin, AsyncGenerator):
+            stdin = asyncio.subprocess.PIPE
+        else:
+            stdin = self._stdin
+        if self._stdout is True or isinstance(self._stdout, AsyncGenerator):
+            stdout = asyncio.subprocess.PIPE
+        else:
+            stdout = self._stdout
+        if self._stderr is True or isinstance(self._stderr, AsyncGenerator):
+            stderr = asyncio.subprocess.PIPE
+        else:
+            stderr = self._stderr
         self.proc = await asyncio.create_subprocess_exec(
             *self._args,
-            stdin=stdin if self._enable_stdin else None,
-            stdout=stdout if self._enable_stdout else None,
-            stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
+            stdin=stdin if self._stdin_enabled else None,
+            stdout=stdout if self._stdout_enabled else None,
+            stderr=stderr if self._stderr_enabled else None,
         )
-        LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid)
+        LOGGER.debug("Process %s started with PID %s", self.name, self.proc.pid)
+        if not isinstance(self._stdin, int | None):
+            self.attached_tasks.append(asyncio.create_task(self._feed_stdin()))
+        if self._stderr_logger:
+            self.attached_tasks.append(asyncio.create_task(self._read_stderr()))
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
@@ -196,13 +215,13 @@ class AsyncProcess:
             except TimeoutError:
                 LOGGER.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
-                    self._name,
+                    self.name,
                     self.proc.pid,
                 )
                 self.proc.terminate()
         LOGGER.debug(
             "Process %s with PID %s stopped with returncode %s",
-            self._name,
+            self.name,
             self.proc.pid,
             self.returncode,
         )
@@ -240,10 +259,12 @@ class AsyncProcess:
                 # raise for all other (value) errors
                 raise
 
-    async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None:
+    async def _feed_stdin(self) -> None:
         """Feed stdin with chunks from an AsyncGenerator."""
+        if TYPE_CHECKING:
+            self._stdin: AsyncGenerator[bytes, None]
         try:
-            async for chunk in custom_stdin:
+            async for chunk in self._stdin:
                 if self._close_called or self.proc.stdin.is_closing():
                     return
                 await self.write(chunk)
@@ -251,9 +272,22 @@ class AsyncProcess:
         except asyncio.CancelledError:
             # make sure the stdin generator is also properly closed
             # by propagating a cancellederror within
-            task = asyncio.create_task(custom_stdin.__anext__())
+            task = asyncio.create_task(self._stdin.__anext__())
             task.cancel()
 
+    async def _read_stderr(self) -> None:
+        """Read stderr and log to logger."""
+        async for line in self.iter_stderr():
+            line = line.decode().strip()  # noqa: PLW2901
+            if not line:
+                continue
+            if "error" in line.lower():
+                self._stderr_logger.error(line)
+            elif "warning" in line.lower():
+                self._stderr_logger.warning(line)
+            else:
+                self._stderr_logger.log(VERBOSE_LOG_LEVEL, line)
+
 
 async def check_output(args: str | list[str]) -> tuple[int, bytes]:
     """Run subprocess and return output."""
index e63a82b28a3892cdc20c01a3bb6ffa97ef5844a3..c43545b28f561ecb2f06f3b88680f57d65463701 100644 (file)
@@ -368,9 +368,7 @@ async def parse_tags(
     )
 
     writer_task: asyncio.Task | None = None
-    ffmpeg_proc = AsyncProcess(
-        args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False
-    )
+    ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True)
     await ffmpeg_proc.start()
 
     async def writer() -> None:
@@ -441,9 +439,7 @@ async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> b
     )
 
     writer_task: asyncio.Task | None = None
-    ffmpeg_proc = AsyncProcess(
-        args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False
-    )
+    ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True)
     await ffmpeg_proc.start()
 
     async def writer() -> None:
index 0bee81778c8b274ecc23e134599807ce953de612..1311a6db5800ac9f7100a041c5b6bdf728b37190 100644 (file)
@@ -269,23 +269,16 @@ class AirplayStream:
         )
         self._ffmpeg_proc = AsyncProcess(
             ffmpeg_args,
-            enable_stdin=True,
-            enable_stdout=True,
-            enable_stderr=False,
-            custom_stdin=read_from_buffer(),
-            custom_stdout=write,
+            stdin=read_from_buffer(),
+            stdout=write,
+            stderr="cliraop_ffmpeg",
             name="cliraop_ffmpeg",
         )
         await self._ffmpeg_proc.start()
         os.close(write)
 
         self._cliraop_proc = AsyncProcess(
-            cliraop_args,
-            enable_stdin=True,
-            enable_stdout=False,
-            enable_stderr=True,
-            custom_stdin=read,
-            name="cliraop",
+            cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop"
         )
         await self._cliraop_proc.start()
         os.close(read)
index 86a32d77a29dcac7e45ebbbdfff656476aab51ae..5695c40f4846440ec23149fd8fbae4ffbfb3a2e0 100644 (file)
@@ -13,7 +13,9 @@ from aiofiles.os import wrap
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
 from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.errors import SetupFailedError
+from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_PATH
+from music_assistant.server.helpers.audio import get_file_stream
 
 from .base import (
     CONF_ENTRY_MISSING_ALBUM_ARTIST,
@@ -167,10 +169,18 @@ class LocalFileSystemProvider(FileSystemProviderBase):
         abs_path = get_absolute_path(self.base_path, file_path)
         return await exists(abs_path)
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        abs_path = get_absolute_path(self.base_path, streamdetails.item_id)
+        async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position):
+            yield chunk
+
     async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
         """Yield (binary) contents of file in chunks of bytes."""
         abs_path = get_absolute_path(self.base_path, file_path)
-        chunk_size = 512000
+        chunk_size = 64000
         async with aiofiles.open(abs_path, "rb") as _file:
             if seek:
                 await _file.seek(seek)
index dbdb53d856f327c74cff07ee45022318d60f9c19..2ba4ea5d0090ad29705a36d9605ea1383a838514 100644 (file)
@@ -4,19 +4,23 @@ from __future__ import annotations
 
 import asyncio
 import platform
+from collections.abc import AsyncGenerator
 from typing import TYPE_CHECKING
 
 from music_assistant.common.helpers.util import get_ip_from_host
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
 from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.errors import LoginFailed
+from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
+from music_assistant.server.helpers.audio import get_file_stream
 from music_assistant.server.providers.filesystem_local import (
     CONF_ENTRY_MISSING_ALBUM_ARTIST,
     LocalFileSystemProvider,
     exists,
     makedirs,
 )
+from music_assistant.server.providers.filesystem_local.helpers import get_absolute_path
 
 if TYPE_CHECKING:
     from music_assistant.common.models.config_entries import ProviderConfig
@@ -154,6 +158,14 @@ class SMBFileSystemProvider(LocalFileSystemProvider):
         """
         await self.unmount()
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        abs_path = get_absolute_path(self.base_path, streamdetails.item_id)
+        async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position):
+            yield chunk
+
     async def mount(self) -> None:
         """Mount the SMB location to a temporary folder."""
         server: str = self.config.get_value(CONF_HOST)
index 066f923a3225e73885bc00b458185e36e0b9628b..5141a39749137ad17869cbbf605f6e2a16f09796 100644 (file)
@@ -299,5 +299,5 @@ class RadioBrowserProvider(MusicProvider):
     ) -> AsyncGenerator[bytes, None]:
         """Return the audio stream for the provider item."""
         # report playback started as soon as we start streaming
-        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0):
+        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
             yield chunk
index 0f0cb81fb80e9195f5f4e5d125ecaf2bfb04bfc0..041b458263767a85bf36c15e9a2669858585853f 100644 (file)
@@ -369,9 +369,9 @@ class SnapCastProvider(PlayerProvider):
             try:
                 async with AsyncProcess(
                     ffmpeg_args,
-                    enable_stdin=True,
-                    enable_stdout=False,
-                    enable_stderr=False,
+                    stdin=True,
+                    stdout=False,
+                    stderr=self.logger.getChild("ffmpeg"),
                     name="snapcast_ffmpeg",
                 ) as ffmpeg_proc:
                     async for chunk in audio_source:
@@ -498,10 +498,8 @@ class SnapCastProvider(PlayerProvider):
             "--tcp.enabled=true",
             "--tcp.port=1705",
         ]
-        async with AsyncProcess(
-            args, enable_stdin=False, enable_stdout=True, enable_stderr=False
-        ) as snapserver_proc:
-            # keep reading from stderr until exit
+        async with AsyncProcess(args, stdin=False, stdout=True, stderr=False) as snapserver_proc:
+            # keep reading from stdout until exit
             async for data in snapserver_proc.iter_any():
                 data = data.decode().strip()  # noqa: PLW2901
                 for line in data.split("\n"):
index c9560b407c2f73528bf0a34acff4936616a1abbc..841980f10416fe89e57eba355e18fd8c5c083cfa 100644 (file)
@@ -432,7 +432,7 @@ class SpotifyProvider(MusicProvider):
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
         bytes_sent = 0
-        async with AsyncProcess(args) as librespot_proc:
+        async with AsyncProcess(args, stdout=True) as librespot_proc:
             async for chunk in librespot_proc.iter_any():
                 yield chunk
                 bytes_sent += len(chunk)
@@ -442,7 +442,7 @@ class SpotifyProvider(MusicProvider):
             # https://github.com/librespot-org/librespot/issues/972
             # retry with ap-port set to invalid value, which will force fallback
             args += ["--ap-port", "12345"]
-            async with AsyncProcess(args) as librespot_proc:
+            async with AsyncProcess(args, stdout=True) as librespot_proc:
                 async for chunk in librespot_proc.iter_any():
                     yield chunk
             self._ap_workaround = True
@@ -689,7 +689,7 @@ class SpotifyProvider(MusicProvider):
         ]
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
-        async with AsyncProcess(args, enable_stdout=True) as librespot:
+        async with AsyncProcess(args, stdout=True) as librespot:
             stdout = await librespot.read(-1)
         if stdout.decode().strip() != "authorized":
             raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}")
@@ -725,7 +725,7 @@ class SpotifyProvider(MusicProvider):
         ]
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
-        async with AsyncProcess(args, enable_stdout=True) as librespot:
+        async with AsyncProcess(args, stdout=True) as librespot:
             stdout = await librespot.read(-1)
         duration = round(time.time() - time_start, 2)
         try:
index a7a8369ba7a9188a944ec322ae5c6855fac0836f..af55ab46d7fabed1c85a853edff2569bc1951c6a 100644 (file)
@@ -253,7 +253,7 @@ class TuneInProvider(MusicProvider):
     ) -> AsyncGenerator[bytes, None]:
         """Return the audio stream for the provider item."""
         # report playback started as soon as we start streaming
-        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0):
+        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
             yield chunk
 
     async def __get_data(self, endpoint: str, **kwargs):