From: Marcel van der Veldt Date: Fri, 29 Mar 2024 01:48:11 +0000 (+0100) Subject: More fixes for playback and better logging (#1189) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=f3aa66a7053041331f485944b3839f1dca97f1ab;p=music-assistant-server.git More fixes for playback and better logging (#1189) More fixes for playback --- diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index cdbce83f..826d5fd1 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -10,6 +10,7 @@ from __future__ import annotations import asyncio import logging +import os import time import urllib.parse from collections.abc import AsyncGenerator @@ -17,6 +18,7 @@ from contextlib import suppress from typing import TYPE_CHECKING, Any import shortuuid +from aiofiles.os import wrap from aiohttp import web from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool @@ -78,6 +80,8 @@ FLOW_DEFAULT_BIT_DEPTH = 24 # pylint:disable=too-many-locals +isfile = wrap(os.path.isfile) + class MultiClientStreamJob: """ @@ -999,10 +1003,6 @@ class StreamsController(CoreController): if streamdetails.fade_in: filter_params.append("afade=type=in:start_time=0:duration=3") - audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( - streamdetails, - seek_position=streamdetails.seek_position, - ) ffmpeg_args = get_ffmpeg_args( input_format=streamdetails.audio_format, output_format=pcm_format, @@ -1081,12 +1081,12 @@ class StreamsController(CoreController): # cleanup del stderr_data + audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( + streamdetails, + seek_position=streamdetails.seek_position, + ) async with AsyncProcess( - ffmpeg_args, - enable_stdin=True, - enable_stderr=True, - custom_stdin=audio_source, - name="ffmpeg_media_stream", + ffmpeg_args, stdin=audio_source, stdout=True, stderr=True, name="ffmpeg_media_stream" ) as ffmpeg_proc: state_data = {"finished": asyncio.Event(), "bytes_sent": 0} logger.debug("start media stream for: %s", streamdetails.uri) diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 177082bd..9ac7fe82 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -113,7 +113,7 @@ async def crossfade_pcm_parts( fmt, "-", ] - async with AsyncProcess(args, True) as proc: + async with AsyncProcess(args, stdin=True, stdout=True) as proc: crossfade_data, _ = await proc.communicate(fade_in_part) if crossfade_data: LOGGER.log( @@ -170,7 +170,7 @@ async def strip_silence( ] # output args args += ["-f", fmt, "-"] - async with AsyncProcess(args, True) as proc: + async with AsyncProcess(args, stdin=True, stdout=True) as proc: stripped_data, _ = await proc.communicate(audio_data) # return stripped audio @@ -597,7 +597,36 @@ async def get_file_stream( if not streamdetails.size: stat = await asyncio.to_thread(os.stat, filename) streamdetails.size = stat.st_size - chunk_size = get_chunksize(streamdetails.audio_format.content_type) + + # seeking an unknown or container format is not supported due to the (moov) headers + if seek_position and ( + streamdetails.audio_format.content_type + in ( + ContentType.UNKNOWN, + ContentType.M4A, + ContentType.M4B, + ContentType.MP4, + ) + ): + LOGGER.debug( + "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.", + streamdetails.uri, + streamdetails.audio_format.output_format_str, + ) + async for chunk in get_ffmpeg_stream( + filename, + # we must set the input content type to unknown to + # enforce ffmpeg to determine it from the headers + input_format=AudioFormat(content_type=ContentType.UNKNOWN), + # enforce wav as we dont want to re-encode lossy formats + # choose wav so we have descriptive headers and move on + output_format=AudioFormat(content_type=ContentType.WAV), + extra_input_args=["-ss", str(seek_position)], + ): + yield chunk + return + + chunk_size = get_chunksize(streamdetails.audio_format) async with aiofiles.open(streamdetails.data, "rb") as _file: if seek_position: seek_pos = int((streamdetails.size / streamdetails.duration) * seek_position) @@ -617,8 +646,9 @@ async def get_ffmpeg_stream( filter_params: list[str] | None = None, extra_args: list[str] | None = None, chunk_size: int | None = None, - loglevel: str | None = None, + ffmpeg_loglevel: str = "info", extra_input_args: list[str] | None = None, + logger: logging.Logger | None = None, ) -> AsyncGenerator[bytes, None]: """ Get the ffmpeg audio stream as async generator. @@ -626,25 +656,22 @@ async def get_ffmpeg_stream( Takes care of resampling and/or recoding if needed, according to player preferences. """ - if loglevel is None: - loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "fatal" - use_stdin = not isinstance(audio_input, str) ffmpeg_args = get_ffmpeg_args( input_format=input_format, output_format=output_format, filter_params=filter_params or [], extra_args=extra_args or [], - input_path="-" if use_stdin else audio_input, + input_path=audio_input if isinstance(audio_input, str) else "-", output_path="-", - loglevel=loglevel, + loglevel=ffmpeg_loglevel, extra_input_args=extra_input_args or [], ) + stdin = audio_input if not isinstance(audio_input, str) else True async with AsyncProcess( ffmpeg_args, - enable_stdin=use_stdin, - enable_stdout=True, - enable_stderr=False, - custom_stdin=audio_input if use_stdin else None, + stdin=stdin, + stdout=True, + stderr=logger or LOGGER.getChild("ffmpeg_stream"), name="ffmpeg_stream", ) as ffmpeg_proc: # read final chunks from stdout @@ -722,7 +749,7 @@ async def get_silence( output_format.output_format_str, "-", ] - async with AsyncProcess(args) as ffmpeg_proc: + async with AsyncProcess(args, stdout=True) as ffmpeg_proc: async for chunk in ffmpeg_proc.iter_any(): yield chunk @@ -782,7 +809,7 @@ def get_ffmpeg_args( extra_args: list[str] | None = None, input_path: str = "-", output_path: str = "-", - loglevel: str = "fatal", + loglevel: str = "info", extra_input_args: list[str] | None = None, ) -> list[str]: """Collect all args to send to the ffmpeg process.""" @@ -847,7 +874,12 @@ def get_ffmpeg_args( "-i", input_path, ] - elif input_format.content_type == ContentType.UNKNOWN: + elif input_format.content_type in ( + ContentType.UNKNOWN, + ContentType.M4A, + ContentType.M4B, + ContentType.MP4, + ): # let ffmpeg guess/auto detect the content type input_args += ["-i", input_path] else: diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 6ed4539a..532de822 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -11,15 +11,17 @@ from __future__ import annotations import asyncio import logging import os + +# if TYPE_CHECKING: +from collections.abc import AsyncGenerator from contextlib import suppress from signal import SIGINT from types import TracebackType from typing import TYPE_CHECKING -if TYPE_CHECKING: - from collections.abc import AsyncGenerator +from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process") DEFAULT_CHUNKSIZE = 128000 @@ -38,28 +40,31 @@ class AsyncProcess: def __init__( self, args: list[str], - enable_stdin: bool = False, - enable_stdout: bool = True, - enable_stderr: bool = False, - custom_stdin: AsyncGenerator[bytes, None] | int | None = None, - custom_stdout: int | None = None, + stdin: bool | int | AsyncGenerator[bytes, None] | None = None, + stdout: bool | int | None = None, + stderr: bool | int | logging.Logger | None = None, name: str | None = None, ) -> None: """Initialize AsyncProcess.""" self.proc: asyncio.subprocess.Process | None = None + if isinstance(stderr, logging.Logger): + self._stderr_logger = stderr + stderr = asyncio.subprocess.PIPE + else: + self._stderr_logger = None self._args = args - self._enable_stdin = enable_stdin - self._enable_stdout = enable_stdout - self._enable_stderr = enable_stderr + self._stdin = stdin + self._stdout = stdout + self._stderr = stderr + self._stdin_enabled = stdin not in (None, False) + self._stdout_enabled = stdout not in (None, False) + self._stderr_enabled = stderr not in (None, False) self._close_called = False self._returncode: bool | None = None - self._name = name or self._args[0].split(os.sep)[-1] + if name is None: + name = self._args[0].split(os.sep)[-1] + self.name = name self.attached_tasks: list[asyncio.Task] = [] - self._custom_stdin = custom_stdin - if not isinstance(custom_stdin, int | None): - self._custom_stdin = None - self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin))) - self._custom_stdout = custom_stdout @property def closed(self) -> bool: @@ -95,15 +100,29 @@ class AsyncProcess: async def start(self) -> None: """Perform Async init of process.""" - stdin = self._custom_stdin if self._custom_stdin is not None else asyncio.subprocess.PIPE - stdout = self._custom_stdout if self._custom_stdout is not None else asyncio.subprocess.PIPE + if self._stdin is True or isinstance(self._stdin, AsyncGenerator): + stdin = asyncio.subprocess.PIPE + else: + stdin = self._stdin + if self._stdout is True or isinstance(self._stdout, AsyncGenerator): + stdout = asyncio.subprocess.PIPE + else: + stdout = self._stdout + if self._stderr is True or isinstance(self._stderr, AsyncGenerator): + stderr = asyncio.subprocess.PIPE + else: + stderr = self._stderr self.proc = await asyncio.create_subprocess_exec( *self._args, - stdin=stdin if self._enable_stdin else None, - stdout=stdout if self._enable_stdout else None, - stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, + stdin=stdin if self._stdin_enabled else None, + stdout=stdout if self._stdout_enabled else None, + stderr=stderr if self._stderr_enabled else None, ) - LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid) + LOGGER.debug("Process %s started with PID %s", self.name, self.proc.pid) + if not isinstance(self._stdin, int | None): + self.attached_tasks.append(asyncio.create_task(self._feed_stdin())) + if self._stderr_logger: + self.attached_tasks.append(asyncio.create_task(self._read_stderr())) async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" @@ -196,13 +215,13 @@ class AsyncProcess: except TimeoutError: LOGGER.debug( "Process %s with PID %s did not stop in time. Sending terminate...", - self._name, + self.name, self.proc.pid, ) self.proc.terminate() LOGGER.debug( "Process %s with PID %s stopped with returncode %s", - self._name, + self.name, self.proc.pid, self.returncode, ) @@ -240,10 +259,12 @@ class AsyncProcess: # raise for all other (value) errors raise - async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None: + async def _feed_stdin(self) -> None: """Feed stdin with chunks from an AsyncGenerator.""" + if TYPE_CHECKING: + self._stdin: AsyncGenerator[bytes, None] try: - async for chunk in custom_stdin: + async for chunk in self._stdin: if self._close_called or self.proc.stdin.is_closing(): return await self.write(chunk) @@ -251,9 +272,22 @@ class AsyncProcess: except asyncio.CancelledError: # make sure the stdin generator is also properly closed # by propagating a cancellederror within - task = asyncio.create_task(custom_stdin.__anext__()) + task = asyncio.create_task(self._stdin.__anext__()) task.cancel() + async def _read_stderr(self) -> None: + """Read stderr and log to logger.""" + async for line in self.iter_stderr(): + line = line.decode().strip() # noqa: PLW2901 + if not line: + continue + if "error" in line.lower(): + self._stderr_logger.error(line) + elif "warning" in line.lower(): + self._stderr_logger.warning(line) + else: + self._stderr_logger.log(VERBOSE_LOG_LEVEL, line) + async def check_output(args: str | list[str]) -> tuple[int, bytes]: """Run subprocess and return output.""" diff --git a/music_assistant/server/helpers/tags.py b/music_assistant/server/helpers/tags.py index e63a82b2..c43545b2 100644 --- a/music_assistant/server/helpers/tags.py +++ b/music_assistant/server/helpers/tags.py @@ -368,9 +368,7 @@ async def parse_tags( ) writer_task: asyncio.Task | None = None - ffmpeg_proc = AsyncProcess( - args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False - ) + ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True) await ffmpeg_proc.start() async def writer() -> None: @@ -441,9 +439,7 @@ async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> b ) writer_task: asyncio.Task | None = None - ffmpeg_proc = AsyncProcess( - args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False - ) + ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True) await ffmpeg_proc.start() async def writer() -> None: diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 0bee8177..1311a6db 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -269,23 +269,16 @@ class AirplayStream: ) self._ffmpeg_proc = AsyncProcess( ffmpeg_args, - enable_stdin=True, - enable_stdout=True, - enable_stderr=False, - custom_stdin=read_from_buffer(), - custom_stdout=write, + stdin=read_from_buffer(), + stdout=write, + stderr="cliraop_ffmpeg", name="cliraop_ffmpeg", ) await self._ffmpeg_proc.start() os.close(write) self._cliraop_proc = AsyncProcess( - cliraop_args, - enable_stdin=True, - enable_stdout=False, - enable_stderr=True, - custom_stdin=read, - name="cliraop", + cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop" ) await self._cliraop_proc.start() os.close(read) diff --git a/music_assistant/server/providers/filesystem_local/__init__.py b/music_assistant/server/providers/filesystem_local/__init__.py index 86a32d77..5695c40f 100644 --- a/music_assistant/server/providers/filesystem_local/__init__.py +++ b/music_assistant/server/providers/filesystem_local/__init__.py @@ -13,7 +13,9 @@ from aiofiles.os import wrap from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.errors import SetupFailedError +from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import CONF_PATH +from music_assistant.server.helpers.audio import get_file_stream from .base import ( CONF_ENTRY_MISSING_ALBUM_ARTIST, @@ -167,10 +169,18 @@ class LocalFileSystemProvider(FileSystemProviderBase): abs_path = get_absolute_path(self.base_path, file_path) return await exists(abs_path) + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + abs_path = get_absolute_path(self.base_path, streamdetails.item_id) + async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position): + yield chunk + async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]: """Yield (binary) contents of file in chunks of bytes.""" abs_path = get_absolute_path(self.base_path, file_path) - chunk_size = 512000 + chunk_size = 64000 async with aiofiles.open(abs_path, "rb") as _file: if seek: await _file.seek(seek) diff --git a/music_assistant/server/providers/filesystem_smb/__init__.py b/music_assistant/server/providers/filesystem_smb/__init__.py index dbdb53d8..2ba4ea5d 100644 --- a/music_assistant/server/providers/filesystem_smb/__init__.py +++ b/music_assistant/server/providers/filesystem_smb/__init__.py @@ -4,19 +4,23 @@ from __future__ import annotations import asyncio import platform +from collections.abc import AsyncGenerator from typing import TYPE_CHECKING from music_assistant.common.helpers.util import get_ip_from_host from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.errors import LoginFailed +from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME +from music_assistant.server.helpers.audio import get_file_stream from music_assistant.server.providers.filesystem_local import ( CONF_ENTRY_MISSING_ALBUM_ARTIST, LocalFileSystemProvider, exists, makedirs, ) +from music_assistant.server.providers.filesystem_local.helpers import get_absolute_path if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig @@ -154,6 +158,14 @@ class SMBFileSystemProvider(LocalFileSystemProvider): """ await self.unmount() + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + abs_path = get_absolute_path(self.base_path, streamdetails.item_id) + async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position): + yield chunk + async def mount(self) -> None: """Mount the SMB location to a temporary folder.""" server: str = self.config.get_value(CONF_HOST) diff --git a/music_assistant/server/providers/radiobrowser/__init__.py b/music_assistant/server/providers/radiobrowser/__init__.py index 066f923a..5141a397 100644 --- a/music_assistant/server/providers/radiobrowser/__init__.py +++ b/music_assistant/server/providers/radiobrowser/__init__.py @@ -299,5 +299,5 @@ class RadioBrowserProvider(MusicProvider): ) -> AsyncGenerator[bytes, None]: """Return the audio stream for the provider item.""" # report playback started as soon as we start streaming - async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0): + async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails): yield chunk diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 0f0cb81f..041b4582 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -369,9 +369,9 @@ class SnapCastProvider(PlayerProvider): try: async with AsyncProcess( ffmpeg_args, - enable_stdin=True, - enable_stdout=False, - enable_stderr=False, + stdin=True, + stdout=False, + stderr=self.logger.getChild("ffmpeg"), name="snapcast_ffmpeg", ) as ffmpeg_proc: async for chunk in audio_source: @@ -498,10 +498,8 @@ class SnapCastProvider(PlayerProvider): "--tcp.enabled=true", "--tcp.port=1705", ] - async with AsyncProcess( - args, enable_stdin=False, enable_stdout=True, enable_stderr=False - ) as snapserver_proc: - # keep reading from stderr until exit + async with AsyncProcess(args, stdin=False, stdout=True, stderr=False) as snapserver_proc: + # keep reading from stdout until exit async for data in snapserver_proc.iter_any(): data = data.decode().strip() # noqa: PLW2901 for line in data.split("\n"): diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index c9560b40..841980f1 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -432,7 +432,7 @@ class SpotifyProvider(MusicProvider): if self._ap_workaround: args += ["--ap-port", "12345"] bytes_sent = 0 - async with AsyncProcess(args) as librespot_proc: + async with AsyncProcess(args, stdout=True) as librespot_proc: async for chunk in librespot_proc.iter_any(): yield chunk bytes_sent += len(chunk) @@ -442,7 +442,7 @@ class SpotifyProvider(MusicProvider): # https://github.com/librespot-org/librespot/issues/972 # retry with ap-port set to invalid value, which will force fallback args += ["--ap-port", "12345"] - async with AsyncProcess(args) as librespot_proc: + async with AsyncProcess(args, stdout=True) as librespot_proc: async for chunk in librespot_proc.iter_any(): yield chunk self._ap_workaround = True @@ -689,7 +689,7 @@ class SpotifyProvider(MusicProvider): ] if self._ap_workaround: args += ["--ap-port", "12345"] - async with AsyncProcess(args, enable_stdout=True) as librespot: + async with AsyncProcess(args, stdout=True) as librespot: stdout = await librespot.read(-1) if stdout.decode().strip() != "authorized": raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}") @@ -725,7 +725,7 @@ class SpotifyProvider(MusicProvider): ] if self._ap_workaround: args += ["--ap-port", "12345"] - async with AsyncProcess(args, enable_stdout=True) as librespot: + async with AsyncProcess(args, stdout=True) as librespot: stdout = await librespot.read(-1) duration = round(time.time() - time_start, 2) try: diff --git a/music_assistant/server/providers/tunein/__init__.py b/music_assistant/server/providers/tunein/__init__.py index a7a8369b..af55ab46 100644 --- a/music_assistant/server/providers/tunein/__init__.py +++ b/music_assistant/server/providers/tunein/__init__.py @@ -253,7 +253,7 @@ class TuneInProvider(MusicProvider): ) -> AsyncGenerator[bytes, None]: """Return the audio stream for the provider item.""" # report playback started as soon as we start streaming - async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0): + async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails): yield chunk async def __get_data(self, endpoint: str, **kwargs):