import asyncio
import logging
+import os
import time
import urllib.parse
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any
import shortuuid
+from aiofiles.os import wrap
from aiohttp import web
from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
# pylint:disable=too-many-locals
+isfile = wrap(os.path.isfile)
+
class MultiClientStreamJob:
"""
if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
- audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
- streamdetails,
- seek_position=streamdetails.seek_position,
- )
ffmpeg_args = get_ffmpeg_args(
input_format=streamdetails.audio_format,
output_format=pcm_format,
# cleanup
del stderr_data
+ audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+ streamdetails,
+ seek_position=streamdetails.seek_position,
+ )
async with AsyncProcess(
- ffmpeg_args,
- enable_stdin=True,
- enable_stderr=True,
- custom_stdin=audio_source,
- name="ffmpeg_media_stream",
+ ffmpeg_args, stdin=audio_source, stdout=True, stderr=True, name="ffmpeg_media_stream"
) as ffmpeg_proc:
state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
logger.debug("start media stream for: %s", streamdetails.uri)
fmt,
"-",
]
- async with AsyncProcess(args, True) as proc:
+ async with AsyncProcess(args, stdin=True, stdout=True) as proc:
crossfade_data, _ = await proc.communicate(fade_in_part)
if crossfade_data:
LOGGER.log(
]
# output args
args += ["-f", fmt, "-"]
- async with AsyncProcess(args, True) as proc:
+ async with AsyncProcess(args, stdin=True, stdout=True) as proc:
stripped_data, _ = await proc.communicate(audio_data)
# return stripped audio
if not streamdetails.size:
stat = await asyncio.to_thread(os.stat, filename)
streamdetails.size = stat.st_size
- chunk_size = get_chunksize(streamdetails.audio_format.content_type)
+
+ # seeking an unknown or container format is not supported due to the (moov) headers
+ if seek_position and (
+ streamdetails.audio_format.content_type
+ in (
+ ContentType.UNKNOWN,
+ ContentType.M4A,
+ ContentType.M4B,
+ ContentType.MP4,
+ )
+ ):
+ LOGGER.debug(
+ "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.",
+ streamdetails.uri,
+ streamdetails.audio_format.output_format_str,
+ )
+ async for chunk in get_ffmpeg_stream(
+ filename,
+ # we must set the input content type to unknown to
+ # enforce ffmpeg to determine it from the headers
+ input_format=AudioFormat(content_type=ContentType.UNKNOWN),
+ # enforce wav as we dont want to re-encode lossy formats
+ # choose wav so we have descriptive headers and move on
+ output_format=AudioFormat(content_type=ContentType.WAV),
+ extra_input_args=["-ss", str(seek_position)],
+ ):
+ yield chunk
+ return
+
+ chunk_size = get_chunksize(streamdetails.audio_format)
async with aiofiles.open(streamdetails.data, "rb") as _file:
if seek_position:
seek_pos = int((streamdetails.size / streamdetails.duration) * seek_position)
filter_params: list[str] | None = None,
extra_args: list[str] | None = None,
chunk_size: int | None = None,
- loglevel: str | None = None,
+ ffmpeg_loglevel: str = "info",
extra_input_args: list[str] | None = None,
+ logger: logging.Logger | None = None,
) -> AsyncGenerator[bytes, None]:
"""
Get the ffmpeg audio stream as async generator.
Takes care of resampling and/or recoding if needed,
according to player preferences.
"""
- if loglevel is None:
- loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "fatal"
- use_stdin = not isinstance(audio_input, str)
ffmpeg_args = get_ffmpeg_args(
input_format=input_format,
output_format=output_format,
filter_params=filter_params or [],
extra_args=extra_args or [],
- input_path="-" if use_stdin else audio_input,
+ input_path=audio_input if isinstance(audio_input, str) else "-",
output_path="-",
- loglevel=loglevel,
+ loglevel=ffmpeg_loglevel,
extra_input_args=extra_input_args or [],
)
+ stdin = audio_input if not isinstance(audio_input, str) else True
async with AsyncProcess(
ffmpeg_args,
- enable_stdin=use_stdin,
- enable_stdout=True,
- enable_stderr=False,
- custom_stdin=audio_input if use_stdin else None,
+ stdin=stdin,
+ stdout=True,
+ stderr=logger or LOGGER.getChild("ffmpeg_stream"),
name="ffmpeg_stream",
) as ffmpeg_proc:
# read final chunks from stdout
output_format.output_format_str,
"-",
]
- async with AsyncProcess(args) as ffmpeg_proc:
+ async with AsyncProcess(args, stdout=True) as ffmpeg_proc:
async for chunk in ffmpeg_proc.iter_any():
yield chunk
extra_args: list[str] | None = None,
input_path: str = "-",
output_path: str = "-",
- loglevel: str = "fatal",
+ loglevel: str = "info",
extra_input_args: list[str] | None = None,
) -> list[str]:
"""Collect all args to send to the ffmpeg process."""
"-i",
input_path,
]
- elif input_format.content_type == ContentType.UNKNOWN:
+ elif input_format.content_type in (
+ ContentType.UNKNOWN,
+ ContentType.M4A,
+ ContentType.M4B,
+ ContentType.MP4,
+ ):
# let ffmpeg guess/auto detect the content type
input_args += ["-i", input_path]
else:
import asyncio
import logging
import os
+
+# if TYPE_CHECKING:
+from collections.abc import AsyncGenerator
from contextlib import suppress
from signal import SIGINT
from types import TracebackType
from typing import TYPE_CHECKING
-if TYPE_CHECKING:
- from collections.abc import AsyncGenerator
+from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
-LOGGER = logging.getLogger(__name__)
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
DEFAULT_CHUNKSIZE = 128000
def __init__(
self,
args: list[str],
- enable_stdin: bool = False,
- enable_stdout: bool = True,
- enable_stderr: bool = False,
- custom_stdin: AsyncGenerator[bytes, None] | int | None = None,
- custom_stdout: int | None = None,
+ stdin: bool | int | AsyncGenerator[bytes, None] | None = None,
+ stdout: bool | int | None = None,
+ stderr: bool | int | logging.Logger | None = None,
name: str | None = None,
) -> None:
"""Initialize AsyncProcess."""
self.proc: asyncio.subprocess.Process | None = None
+ if isinstance(stderr, logging.Logger):
+ self._stderr_logger = stderr
+ stderr = asyncio.subprocess.PIPE
+ else:
+ self._stderr_logger = None
self._args = args
- self._enable_stdin = enable_stdin
- self._enable_stdout = enable_stdout
- self._enable_stderr = enable_stderr
+ self._stdin = stdin
+ self._stdout = stdout
+ self._stderr = stderr
+ self._stdin_enabled = stdin not in (None, False)
+ self._stdout_enabled = stdout not in (None, False)
+ self._stderr_enabled = stderr not in (None, False)
self._close_called = False
self._returncode: bool | None = None
- self._name = name or self._args[0].split(os.sep)[-1]
+ if name is None:
+ name = self._args[0].split(os.sep)[-1]
+ self.name = name
self.attached_tasks: list[asyncio.Task] = []
- self._custom_stdin = custom_stdin
- if not isinstance(custom_stdin, int | None):
- self._custom_stdin = None
- self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin)))
- self._custom_stdout = custom_stdout
@property
def closed(self) -> bool:
async def start(self) -> None:
"""Perform Async init of process."""
- stdin = self._custom_stdin if self._custom_stdin is not None else asyncio.subprocess.PIPE
- stdout = self._custom_stdout if self._custom_stdout is not None else asyncio.subprocess.PIPE
+ if self._stdin is True or isinstance(self._stdin, AsyncGenerator):
+ stdin = asyncio.subprocess.PIPE
+ else:
+ stdin = self._stdin
+ if self._stdout is True or isinstance(self._stdout, AsyncGenerator):
+ stdout = asyncio.subprocess.PIPE
+ else:
+ stdout = self._stdout
+ if self._stderr is True or isinstance(self._stderr, AsyncGenerator):
+ stderr = asyncio.subprocess.PIPE
+ else:
+ stderr = self._stderr
self.proc = await asyncio.create_subprocess_exec(
*self._args,
- stdin=stdin if self._enable_stdin else None,
- stdout=stdout if self._enable_stdout else None,
- stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
+ stdin=stdin if self._stdin_enabled else None,
+ stdout=stdout if self._stdout_enabled else None,
+ stderr=stderr if self._stderr_enabled else None,
)
- LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid)
+ LOGGER.debug("Process %s started with PID %s", self.name, self.proc.pid)
+ if not isinstance(self._stdin, int | None):
+ self.attached_tasks.append(asyncio.create_task(self._feed_stdin()))
+ if self._stderr_logger:
+ self.attached_tasks.append(asyncio.create_task(self._read_stderr()))
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
except TimeoutError:
LOGGER.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
- self._name,
+ self.name,
self.proc.pid,
)
self.proc.terminate()
LOGGER.debug(
"Process %s with PID %s stopped with returncode %s",
- self._name,
+ self.name,
self.proc.pid,
self.returncode,
)
# raise for all other (value) errors
raise
- async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None:
+ async def _feed_stdin(self) -> None:
"""Feed stdin with chunks from an AsyncGenerator."""
+ if TYPE_CHECKING:
+ self._stdin: AsyncGenerator[bytes, None]
try:
- async for chunk in custom_stdin:
+ async for chunk in self._stdin:
if self._close_called or self.proc.stdin.is_closing():
return
await self.write(chunk)
except asyncio.CancelledError:
# make sure the stdin generator is also properly closed
# by propagating a cancellederror within
- task = asyncio.create_task(custom_stdin.__anext__())
+ task = asyncio.create_task(self._stdin.__anext__())
task.cancel()
+ async def _read_stderr(self) -> None:
+ """Read stderr and log to logger."""
+ async for line in self.iter_stderr():
+ line = line.decode().strip() # noqa: PLW2901
+ if not line:
+ continue
+ if "error" in line.lower():
+ self._stderr_logger.error(line)
+ elif "warning" in line.lower():
+ self._stderr_logger.warning(line)
+ else:
+ self._stderr_logger.log(VERBOSE_LOG_LEVEL, line)
+
async def check_output(args: str | list[str]) -> tuple[int, bytes]:
"""Run subprocess and return output."""
)
writer_task: asyncio.Task | None = None
- ffmpeg_proc = AsyncProcess(
- args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False
- )
+ ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True)
await ffmpeg_proc.start()
async def writer() -> None:
)
writer_task: asyncio.Task | None = None
- ffmpeg_proc = AsyncProcess(
- args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False
- )
+ ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True)
await ffmpeg_proc.start()
async def writer() -> None:
)
self._ffmpeg_proc = AsyncProcess(
ffmpeg_args,
- enable_stdin=True,
- enable_stdout=True,
- enable_stderr=False,
- custom_stdin=read_from_buffer(),
- custom_stdout=write,
+ stdin=read_from_buffer(),
+ stdout=write,
+ stderr="cliraop_ffmpeg",
name="cliraop_ffmpeg",
)
await self._ffmpeg_proc.start()
os.close(write)
self._cliraop_proc = AsyncProcess(
- cliraop_args,
- enable_stdin=True,
- enable_stdout=False,
- enable_stderr=True,
- custom_stdin=read,
- name="cliraop",
+ cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop"
)
await self._cliraop_proc.start()
os.close(read)
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import SetupFailedError
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_PATH
+from music_assistant.server.helpers.audio import get_file_stream
from .base import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
abs_path = get_absolute_path(self.base_path, file_path)
return await exists(abs_path)
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ abs_path = get_absolute_path(self.base_path, streamdetails.item_id)
+ async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position):
+ yield chunk
+
async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
"""Yield (binary) contents of file in chunks of bytes."""
abs_path = get_absolute_path(self.base_path, file_path)
- chunk_size = 512000
+ chunk_size = 64000
async with aiofiles.open(abs_path, "rb") as _file:
if seek:
await _file.seek(seek)
import asyncio
import platform
+from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING
from music_assistant.common.helpers.util import get_ip_from_host
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import LoginFailed
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
+from music_assistant.server.helpers.audio import get_file_stream
from music_assistant.server.providers.filesystem_local import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
LocalFileSystemProvider,
exists,
makedirs,
)
+from music_assistant.server.providers.filesystem_local.helpers import get_absolute_path
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import ProviderConfig
"""
await self.unmount()
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ abs_path = get_absolute_path(self.base_path, streamdetails.item_id)
+ async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position):
+ yield chunk
+
async def mount(self) -> None:
"""Mount the SMB location to a temporary folder."""
server: str = self.config.get_value(CONF_HOST)
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
# report playback started as soon as we start streaming
- async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0):
+ async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
yield chunk
try:
async with AsyncProcess(
ffmpeg_args,
- enable_stdin=True,
- enable_stdout=False,
- enable_stderr=False,
+ stdin=True,
+ stdout=False,
+ stderr=self.logger.getChild("ffmpeg"),
name="snapcast_ffmpeg",
) as ffmpeg_proc:
async for chunk in audio_source:
"--tcp.enabled=true",
"--tcp.port=1705",
]
- async with AsyncProcess(
- args, enable_stdin=False, enable_stdout=True, enable_stderr=False
- ) as snapserver_proc:
- # keep reading from stderr until exit
+ async with AsyncProcess(args, stdin=False, stdout=True, stderr=False) as snapserver_proc:
+ # keep reading from stdout until exit
async for data in snapserver_proc.iter_any():
data = data.decode().strip() # noqa: PLW2901
for line in data.split("\n"):
if self._ap_workaround:
args += ["--ap-port", "12345"]
bytes_sent = 0
- async with AsyncProcess(args) as librespot_proc:
+ async with AsyncProcess(args, stdout=True) as librespot_proc:
async for chunk in librespot_proc.iter_any():
yield chunk
bytes_sent += len(chunk)
# https://github.com/librespot-org/librespot/issues/972
# retry with ap-port set to invalid value, which will force fallback
args += ["--ap-port", "12345"]
- async with AsyncProcess(args) as librespot_proc:
+ async with AsyncProcess(args, stdout=True) as librespot_proc:
async for chunk in librespot_proc.iter_any():
yield chunk
self._ap_workaround = True
]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- async with AsyncProcess(args, enable_stdout=True) as librespot:
+ async with AsyncProcess(args, stdout=True) as librespot:
stdout = await librespot.read(-1)
if stdout.decode().strip() != "authorized":
raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}")
]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- async with AsyncProcess(args, enable_stdout=True) as librespot:
+ async with AsyncProcess(args, stdout=True) as librespot:
stdout = await librespot.read(-1)
duration = round(time.time() - time_start, 2)
try:
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
# report playback started as soon as we start streaming
- async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0):
+ async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
yield chunk
async def __get_data(self, endpoint: str, **kwargs):