return possible_bool in ["true", "True", "1", "on", "ON", 1]
+def try_parse_duration(duration_str: str) -> float:
+ """Try to parse a duration in seconds from a duration (HH:MM:SS) string."""
+ milliseconds = float("0." + duration_str.split(".")[-1]) if "." in duration_str else 0.0
+ duration_parts = duration_str.split(".")[0].split(",")[0].split(":")
+ if len(duration_parts) == 3:
+ seconds = sum(x * int(t) for x, t in zip([3600, 60, 1], duration_parts, strict=False))
+ elif len(duration_parts) == 2:
+ seconds = sum(x * int(t) for x, t in zip([60, 1], duration_parts, strict=False))
+ else:
+ seconds = int(duration_parts[0])
+ return seconds + milliseconds
+
+
def create_sort_name(input_str: str) -> str:
"""Create sort name/title from string."""
input_str = input_str.lower().strip()
true_peak: float
lra: float
threshold: float
+ target_offset: float | None = None
@dataclass(kw_only=True)
# can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item
can_seek: bool = True
- # stream_type:
-
# the fields below will be set/controlled by the streamcontroller
seek_position: int = 0
fade_in: bool = False
API_SCHEMA_VERSION: Final[int] = 24
MIN_SCHEMA_VERSION: Final[int] = 24
-DB_SCHEMA_VERSION: Final[int] = 28
+DB_SCHEMA_VERSION: Final[int] = 29
MASS_LOGGER_NAME: Final[str] = "music_assistant"
if db_row["expires"] < cur_timestamp:
await self.delete(db_row["key"])
cleaned_records += 1
+ await asyncio.sleep(0) # yield to eventloop
if cleaned_records > 50:
self.logger.debug("Compacting database...")
await self.database.vacuum()
# overhead of grabbing the musicbrainz id upfront
library_item = await self.update_item_in_library(db_item.item_id, item)
break
+ await asyncio.sleep(0) # yield to eventloop
if not library_item:
# actually add (or update) the item in the library db
# use the lock to prevent a race condition of the same item being added twice
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import (
- ItemMapping,
- Playlist,
- PlaylistTrack,
- Track,
-)
+from music_assistant.common.models.media_items import ItemMapping, Playlist, PlaylistTrack, Track
from music_assistant.constants import DB_TABLE_PLAYLISTS
from music_assistant.server.helpers.compare import compare_strings
library_item = await self._add_library_item(item)
# preload playlist tracks listing (do not load them in the db)
async for _ in self.tracks(item.item_id, item.provider):
- pass
+ await asyncio.sleep(0) # yield to eventloop
# metadata lookup we need to do after adding it to the db
if metadata_lookup:
await self.mass.metadata.get_playlist_metadata(library_item)
if i.provider_instance == playlist_prov.provider_instance
}
)
+ await asyncio.sleep(0) # yield to eventloop
# check for duplicates
for track_prov in track.provider_mappings:
if (
# existing item found: update it
library_item = await self.update_item_in_library(db_item.item_id, item)
break
+ await asyncio.sleep(0) # yield to eventloop
if not library_item:
# actually add a new item in the library db
# use the lock to prevent a race condition of the same item being added twice
if genre not in playlist_genres:
playlist_genres[genre] = 0
playlist_genres[genre] += 1
+ await asyncio.sleep(0) # yield to eventloop
playlist_genres_filtered = {
genre for genre, count in playlist_genres.items() if count > 5
"true_peak": loudness.true_peak,
"lra": loudness.lra,
"threshold": loudness.threshold,
+ "target_offset": loudness.target_offset,
},
allow_replace=True,
)
await asyncio.to_thread(shutil.copyfile, db_path, db_path_backup)
# handle db migration from previous schema to this one
- if prev_version == 27:
+ if prev_version in (27, 28):
self.logger.info(
"Performing database migration from %s to %s",
prev_version,
true_peak REAL,
lra REAL,
threshold REAL,
+ target_offset REAL,
UNIQUE(item_id, provider));"""
)
await self.database.execute(
from __future__ import annotations
+import asyncio
import random
import time
from collections.abc import AsyncGenerator
elif media_item.media_type == MediaType.PLAYLIST:
async for playlist_track in ctrl.tracks(media_item.item_id, media_item.provider):
tracks.append(playlist_track)
+ await asyncio.sleep(0) # yield to eventloop
await self.mass.music.mark_item_played(
media_item.media_type, media_item.item_id, media_item.provider
)
from aiofiles.os import wrap
from aiohttp import web
-from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
+from music_assistant.common.helpers.util import (
+ get_ip,
+ select_free_port,
+ try_parse_bool,
+ try_parse_duration,
+)
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueOption,
output_format_str=request.match_info["fmt"],
queue_player=queue_player,
default_sample_rate=queue_item.streamdetails.audio_format.sample_rate,
- default_bit_depth=queue_item.streamdetails.audio_format.bit_depth,
+ default_bit_depth=24, # always prefer 24 bits to prevent dithering
)
# prepare request, add some DLNA/UPNP compatible headers
headers = {
queue.display_name,
)
queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id)
- pcm_format = AudioFormat(
- content_type=ContentType.from_bit_depth(
- queue_item.streamdetails.audio_format.bit_depth
- ),
- sample_rate=queue_item.streamdetails.audio_format.sample_rate,
- bit_depth=queue_item.streamdetails.audio_format.bit_depth,
- )
- async for chunk in get_ffmpeg_stream(
- audio_input=self._get_media_stream(
- streamdetails=queue_item.streamdetails,
- pcm_format=pcm_format,
- ),
- input_format=pcm_format,
+ async for chunk in self.get_media_stream(
+ streamdetails=queue_item.streamdetails,
output_format=output_format,
- filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+ extra_filter_params=get_player_filter_params(self.mass, queue_player.player_id),
):
try:
await resp.write(chunk)
output_format_str=request.match_info["fmt"],
queue_player=queue_player,
default_sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
- default_bit_depth=FLOW_DEFAULT_BIT_DEPTH,
+ default_bit_depth=24, # always prefer 24 bits to prevent dithering
)
# play it safe: only allow icy metadata for mp3 and aac
enable_icy = request.headers.get(
bytes_written = 0
buffer = b""
# handle incoming audio chunks
- async for chunk in self._get_media_stream(
+ async for chunk in self.get_media_stream(
queue_track.streamdetails,
- pcm_format=pcm_format,
- # strip silence from begin/end if track is being crossfaded
- strip_silence_begin=use_crossfade and total_bytes_sent > 0,
- strip_silence_end=use_crossfade,
+ output_format=pcm_format,
):
# buffer size needs to be big enough to include the crossfade part
# allow it to be a bit smaller when playback just starts
- if not use_crossfade:
- req_buffer_size = pcm_sample_size
- elif (total_bytes_sent + bytes_written) < crossfade_size:
- req_buffer_size = int(crossfade_size / 2)
+ if not use_crossfade or (total_bytes_sent + bytes_written == 0):
+ req_buffer_size = pcm_sample_size * 2
+ elif (total_bytes_sent + bytes_written) < (crossfade_size * 2):
+ req_buffer_size = pcm_sample_size * 5
else:
- req_buffer_size = crossfade_size
+ # additional 5 seconds to strip silence from last part
+ req_buffer_size = crossfade_size + pcm_sample_size * 5
# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
#### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
if last_fadeout_part:
+ # strip silence from last part
+ buffer = await strip_silence(
+ self.mass,
+ buffer,
+ sample_rate=pcm_format.sample_rate,
+ bit_depth=pcm_format.bit_depth,
+ reverse=False,
+ )
# perform crossfade
fadein_part = buffer[:crossfade_size]
remaining_bytes = buffer[crossfade_size:]
#### OTHER: enough data in buffer, feed to output
while len(buffer) > req_buffer_size:
yield buffer[:pcm_sample_size]
+ await asyncio.sleep(0) # yield to eventloop
bytes_written += pcm_sample_size
buffer = buffer[pcm_sample_size:]
bytes_written += len(last_fadeout_part)
last_fadeout_part = b""
if use_crossfade:
+ # strip silence from last part
+ buffer = await strip_silence(
+ self.mass,
+ buffer,
+ sample_rate=pcm_format.sample_rate,
+ bit_depth=pcm_format.bit_depth,
+ reverse=True,
+ )
# if crossfade is enabled, save fadeout part to pickup for next track
last_fadeout_part = buffer[-crossfade_size:]
remaining_bytes = buffer[:-crossfade_size]
"-i",
ANNOUNCE_ALERT_FILE,
"-filter_complex",
- "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=11:TP=-2",
+ "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=11:TP=-1.5",
]
filter_params = []
async for chunk in get_ffmpeg_stream(
):
yield chunk
- async def _get_media_stream(
+ async def get_media_stream(
self,
streamdetails: StreamDetails,
- pcm_format: AudioFormat,
- strip_silence_begin: bool = False,
- strip_silence_end: bool = False,
+ output_format: AudioFormat,
+ extra_filter_params: list[str] | None = None,
) -> AsyncGenerator[tuple[bool, bytes], None]:
- """
- Get the (raw PCM) audio stream for the given streamdetails.
-
- Other than stripping silence at end and beginning and optional
- volume normalization this is the pure, unaltered audio data as PCM chunks.
- """
+ """Get the audio stream for the given streamdetails."""
logger = self.logger.getChild("media_stream")
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio:
streamdetails.seek_position = 0
- strip_silence_begin = False
- strip_silence_end = False
- if streamdetails.seek_position:
- strip_silence_begin = False
- if not streamdetails.duration or streamdetails.duration < 30:
- strip_silence_end = False
- # pcm_sample_size = chunk size = 1 second of pcm audio
- pcm_sample_size = pcm_format.pcm_sample_size
- buffer_size = (
- pcm_sample_size * 5
- if (strip_silence_begin or strip_silence_end)
- # always require a small amount of buffer to prevent livestreams stuttering
- else pcm_sample_size * 2
- )
# collect all arguments for ffmpeg
- filter_params = []
+ filter_params = extra_filter_params or []
if streamdetails.target_loudness is not None:
# add loudnorm filters
- filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=11:TP=-2"
+ filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11"
if streamdetails.loudness:
filter_rule += f":measured_I={streamdetails.loudness.integrated}"
filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
+ if streamdetails.loudness.target_offset is not None:
+ filter_rule += f":offset={streamdetails.loudness.target_offset}"
+ filter_rule += ":linear=true"
filter_rule += ":print_format=json"
filter_params.append(filter_rule)
if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
-
if streamdetails.stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
extra_input_args = []
if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM:
extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
- logger.debug("start media stream for: %s", streamdetails.uri)
- state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
+ logger.debug("start media stream for: %s", streamdetails.uri)
+ bytes_sent = 0
+ finished = False
async with FFMpeg(
audio_input=audio_source,
input_format=streamdetails.audio_format,
- output_format=pcm_format,
+ output_format=output_format,
filter_params=filter_params,
extra_input_args=[
*extra_input_args,
"-filter_threads",
"1",
],
- name="ffmpeg_media_stream",
- stderr_enabled=True,
+ collect_log_history=True,
+ logger=logger,
) as ffmpeg_proc:
-
- async def log_reader():
- # To prevent stderr locking up, we must keep reading it
- stderr_data = ""
- async for line in ffmpeg_proc.iter_stderr():
- if "error" in line or "warning" in line:
- logger.warning(line)
- elif "critical" in line:
- logger.critical(line)
- elif (
- streamdetails.audio_format.content_type == ContentType.UNKNOWN
- and line.startswith("Stream #0:0: Audio: ")
- ):
- # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
- streamdetails.audio_format.content_type = ContentType.try_parse(
- line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
- )
- elif stderr_data or "loudnorm" in line:
- stderr_data += line
- else:
- logger.debug(line)
- del line
-
- # if we reach this point, the process is finished (completed or aborted)
- if ffmpeg_proc.returncode == 0:
- await state_data["finished"].wait()
- finished = state_data["finished"].is_set()
- bytes_sent = state_data["bytes_sent"]
- seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
- streamdetails.seconds_streamed = seconds_streamed
- state_str = "finished" if finished else "aborted"
+ try:
+ async for chunk in ffmpeg_proc.iter_any():
+ bytes_sent += len(chunk)
+ yield chunk
+ del chunk
+ finished = True
+ finally:
+ await ffmpeg_proc.close()
logger.debug(
- "stream %s for: %s (%s seconds streamed, exitcode %s)",
- state_str,
- streamdetails.uri,
- seconds_streamed,
+ "stream %s (with code %s) for %s",
+ "finished" if finished else "aborted",
ffmpeg_proc.returncode,
+ streamdetails.uri,
)
+ # try to determine how many seconds we've streamed
+ seconds_streamed = 0
+ if output_format.content_type.is_pcm():
+ seconds_streamed = (
+ bytes_sent / output_format.pcm_sample_size if bytes_sent else 0
+ )
+ elif line := next((x for x in ffmpeg_proc.log_history if "time=" in x), None):
+ duration_str = line.split("time=")[1].split(" ")[0]
+ seconds_streamed = try_parse_duration(duration_str)
+
+ if seconds_streamed:
+ streamdetails.seconds_streamed = seconds_streamed
# store accurate duration
- if finished and not streamdetails.seek_position:
+ if finished and not streamdetails.seek_position and seconds_streamed:
streamdetails.duration = seconds_streamed
# parse loudnorm data if we have that collected
- if stderr_data and (loudness_details := parse_loudnorm(stderr_data)):
+ if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
if finished or (seconds_streamed >= required_seconds):
logger.debug(
- "Loudness measurement for %s: %s", streamdetails.uri, loudness_details
+ "Loudness measurement for %s: %s",
+ streamdetails.uri,
+ loudness_details,
)
streamdetails.loudness = loudness_details
- await self.mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness_details
+ self.mass.create_task(
+ self.mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness_details
+ )
)
# report playback
if finished or seconds_streamed > 30:
self.mass.create_task(
self.mass.music.mark_item_played(
- streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+ streamdetails.media_type,
+ streamdetails.item_id,
+ streamdetails.provider,
)
)
if music_prov := self.mass.get_provider(streamdetails.provider):
self.mass.create_task(
music_prov.on_streamed(streamdetails, seconds_streamed)
)
- # cleanup
- del stderr_data
-
- self.mass.create_task(log_reader())
-
- # get pcm chunks from stdout
- # we always stay buffer_size of bytes behind
- # so we can strip silence at the beginning and end of a track
- buffer = b""
- chunk_num = 0
- async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size):
- chunk_num += 1
- buffer += chunk
- del chunk
-
- if len(buffer) < buffer_size:
- # buffer is not full enough, move on
- continue
-
- if strip_silence_begin and chunk_num == 2:
- # first 2 chunks received, strip silence of beginning
- stripped_audio = await strip_silence(
- self.mass,
- buffer,
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- )
- yield stripped_audio
- state_data["bytes_sent"] += len(stripped_audio)
- buffer = b""
- del stripped_audio
- continue
-
- #### OTHER: enough data in buffer, feed to output
- while len(buffer) > buffer_size:
- yield buffer[:pcm_sample_size]
- state_data["bytes_sent"] += pcm_sample_size
- buffer = buffer[pcm_sample_size:]
-
- # all chunks received, strip silence of last part if needed and yield remaining bytes
- if strip_silence_end:
- final_chunk = await strip_silence(
- self.mass,
- buffer,
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- reverse=True,
- )
- else:
- final_chunk = buffer
-
- # yield final chunk to output (as one big chunk)
- yield final_chunk
- state_data["bytes_sent"] += len(final_chunk)
- state_data["finished"].set()
- del final_chunk
- del buffer
def _log_request(self, request: web.Request) -> None:
"""Log request."""
import re
import struct
from collections import deque
+from collections.abc import AsyncGenerator
+from contextlib import suppress
from io import BytesIO
+from signal import SIGINT
from typing import TYPE_CHECKING
import aiofiles
)
from music_assistant.server.helpers.tags import parse_tags
-from .process import AsyncProcess, check_output
+from .process import AsyncProcess, check_output, communicate
from .util import create_tempfile
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator
-
from music_assistant.common.models.player_queue import QueueItem
from music_assistant.server import MusicAssistant
class FFMpeg(AsyncProcess):
"""FFMpeg wrapped as AsyncProcess."""
- def __init__( # noqa: PLR0913
+ def __init__(
self,
audio_input: AsyncGenerator[bytes, None] | str | int,
input_format: AudioFormat,
filter_params: list[str] | None = None,
extra_args: list[str] | None = None,
extra_input_args: list[str] | None = None,
- name: str = "ffmpeg",
- stderr_enabled: bool = False,
audio_output: str | int = "-",
- loglevel: str | None = None,
+ collect_log_history: bool = False,
+ logger: logging.Logger | None = None,
) -> None:
"""Initialize AsyncProcess."""
ffmpeg_args = get_ffmpeg_args(
input_path=audio_input if isinstance(audio_input, str) else "-",
output_path=audio_output if isinstance(audio_output, str) else "-",
extra_input_args=extra_input_args or [],
- loglevel=loglevel or "info"
- if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
- else "error",
+ loglevel="info",
)
+ self.audio_input = audio_input
+ self.input_format = input_format
+ self.collect_log_history = collect_log_history
+ self.log_history: deque[str] = deque(maxlen=100)
+ self._stdin_task: asyncio.Task | None = None
+ self._logger_task: asyncio.Task | None = None
super().__init__(
ffmpeg_args,
- stdin=True if isinstance(audio_input, str) else audio_input,
+ stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
stdout=True if isinstance(audio_output, str) else audio_output,
- stderr=stderr_enabled,
- name=name,
+ stderr=True,
)
+ self.logger = logger or LOGGER.getChild("ffmpeg")
+
+ async def start(self) -> None:
+ """Perform Async init of process."""
+ await super().start()
+ self._logger_task = asyncio.create_task(self._log_reader_task())
+ if isinstance(self.audio_input, AsyncGenerator):
+ self._stdin_task = asyncio.create_task(self._feed_stdin())
+
+ async def close(self, send_signal: bool = True) -> None:
+ """Close/terminate the process and wait for exit."""
+ if self._stdin_task and not self._stdin_task.done():
+ self._stdin_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._stdin_task
+ # make sure the stdin generator is also properly closed
+ # by propagating a cancellederror within
+ task = asyncio.create_task(self.audio_input.__anext__())
+ task.cancel()
+ if not self.collect_log_history:
+ await super().close(send_signal)
+ return
+ # override close logic to make sure we catch all logging
+ self._close_called = True
+ if send_signal and self.returncode is None:
+ self.proc.send_signal(SIGINT)
+ if self.proc.stdin and not self.proc.stdin.is_closing():
+ self.proc.stdin.close()
+ await asyncio.sleep(0) # yield to loop
+ # abort existing readers on stdout first before we send communicate
+ if self.proc.stdout:
+ if self.proc.stdout._waiter is not None:
+ with suppress(asyncio.exceptions.InvalidStateError):
+ self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
+ # read reamaing bytes to unblock pipe
+ await self.read(-1)
+ # wait for log task to complete that reads the remaining data from stderr
+ with suppress(TimeoutError):
+ await asyncio.wait_for(self._logger_task, 5)
+ await super().close(False)
+
+ async def _log_reader_task(self) -> None:
+ """Read ffmpeg log from stderr."""
+ async for line in self.iter_stderr():
+ if self.collect_log_history:
+ self.log_history.append(line)
+ if "error" in line or "warning" in line:
+ self.logger.warning(line)
+ elif "critical" in line:
+ self.logger.critical(line)
+ else:
+ self.logger.log(VERBOSE_LOG_LEVEL, line)
+
+ # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
+ if line.startswith("Stream #0:0: Audio: "):
+ if self.input_format.content_type == ContentType.UNKNOWN:
+ content_type_raw = line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
+ content_type = ContentType.try_parse(content_type_raw)
+ self.logger.info(
+ "Detected (input) content type: %s (%s)", content_type, content_type_raw
+ )
+ self.input_format.content_type = content_type
+ del line
+
+ async def _feed_stdin(self) -> None:
+ """Feed stdin with audio chunks from an AsyncGenerator."""
+ if TYPE_CHECKING:
+ self.audio_input: AsyncGenerator[bytes, None]
+ async for chunk in self.audio_input:
+ await self.write(chunk)
+ # write EOF once we've reached the end of the input stream
+ await self.write_eof()
async def crossfade_pcm_parts(
fmt,
"-",
]
- async with AsyncProcess(args, stdin=True, stdout=True) as proc:
- crossfade_data, _ = await proc.communicate(fade_in_part)
- if crossfade_data:
- LOGGER.log(
- 5,
- "crossfaded 2 pcm chunks. fade_in_part: %s - "
- "fade_out_part: %s - fade_length: %s seconds",
- len(fade_in_part),
- len(fade_out_part),
- fade_length,
- )
- return crossfade_data
- # no crossfade_data, return original data instead
- LOGGER.debug(
- "crossfade of pcm chunks failed: not enough data? "
- "fade_in_part: %s - fade_out_part: %s",
+ _returncode, crossfaded_audio, _stderr = await communicate(args, fade_in_part)
+ if crossfaded_audio:
+ LOGGER.log(
+ 5,
+ "crossfaded 2 pcm chunks. fade_in_part: %s - "
+ "fade_out_part: %s - fade_length: %s seconds",
len(fade_in_part),
len(fade_out_part),
+ fade_length,
)
- return fade_out_part + fade_in_part
+ return crossfaded_audio
+ # no crossfade_data, return original data instead
+ LOGGER.debug(
+ "crossfade of pcm chunks failed: not enough data? " "fade_in_part: %s - fade_out_part: %s",
+ len(fade_in_part),
+ len(fade_out_part),
+ )
+ return fade_out_part + fade_in_part
async def strip_silence(
]
# output args
args += ["-f", fmt, "-"]
- async with AsyncProcess(args, stdin=True, stdout=True) as proc:
- stripped_data, _ = await proc.communicate(audio_data)
+ _returncode, stripped_data, _stderr = await communicate(args, audio_data)
# return stripped audio
bytes_stripped = len(audio_data) - len(stripped_data)
extra_args: list[str] | None = None,
chunk_size: int | None = None,
extra_input_args: list[str] | None = None,
- name: str = "ffmpeg",
+ logger: logging.Logger | None = None,
) -> AsyncGenerator[bytes, None]:
"""
Get the ffmpeg audio stream as async generator.
filter_params=filter_params,
extra_args=extra_args,
extra_input_args=extra_input_args,
- name=name,
+ logger=logger,
) as ffmpeg_proc:
# read final chunks from stdout
iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
output_args = ["-f", "wav", output_path]
else:
# use explicit format identifier for all other
- output_args = ["-f", output_format.content_type.value, output_path]
+ output_args = [
+ "-f",
+ output_format.content_type.value,
+ "-ar",
+ str(output_format.sample_rate),
+ output_path,
+ ]
- # prefer libsoxr high quality resampler (if present) for sample rate conversions
- if input_format.sample_rate != output_format.sample_rate and libsoxr_support:
- filter_params.append("aresample=resampler=soxr")
+ # determine if we need to do resampling
+ if (
+ input_format.sample_rate != output_format.sample_rate
+ or input_format.bit_depth != output_format.bit_depth
+ ):
+ # prefer resampling with libsoxr due to its high quality
+ resample_filter = f'aresample=resampler={"soxr" if libsoxr_support else "swr"}'
+ if output_format.bit_depth < input_format.bit_depth:
+ # apply dithering when going down to 16 bits
+ resample_filter += ":osf=s16:dither_method=triangular_hp"
+ if not output_format.content_type.is_pcm():
+ # specify sample rate if output format is not pcm
+ resample_filter += f":osr={output_format.sample_rate}"
+ filter_params.append(resample_filter)
if filter_params and "-filter_complex" not in extra_args:
extra_args += ["-af", ",".join(filter_params)]
true_peak=float(loudness_data["input_tp"]),
lra=float(loudness_data["input_lra"]),
threshold=float(loudness_data["input_thresh"]),
+ target_offset=float(loudness_data["target_offset"]),
)
from __future__ import annotations
+import asyncio
from typing import TYPE_CHECKING, Any
import aiosqlite
yield item
if len(next_items) < limit:
break
+ await asyncio.sleep(0) # yield to eventloop
offset += limit
async def vacuum(self) -> None:
from contextlib import suppress
from signal import SIGINT
from types import TracebackType
-from typing import TYPE_CHECKING
+from typing import Self
from music_assistant.constants import MASS_LOGGER_NAME
def __init__(
self,
args: list[str],
- stdin: bool | int | AsyncGenerator[bytes, None] | None = None,
+ stdin: bool | int | None = None,
stdout: bool | int | None = None,
- stderr: bool | int | None = None,
+ stderr: bool | int | None = False,
name: str | None = None,
) -> None:
"""Initialize AsyncProcess."""
if name is None:
name = args[0].split(os.sep)[-1]
self.name = name
- self.attached_tasks: list[asyncio.Task] = []
self.logger = LOGGER.getChild(name)
self._args = args
self._stdin = None if stdin is False else stdin
self._returncode = ret_code
return ret_code
- async def __aenter__(self) -> AsyncProcess:
+ async def __aenter__(self) -> Self:
"""Enter context manager."""
await self.start()
return self
"""Perform Async init of process."""
self.proc = await asyncio.create_subprocess_exec(
*self._args,
- stdin=asyncio.subprocess.PIPE
- if (self._stdin is True or isinstance(self._stdin, AsyncGenerator))
- else self._stdin,
+ stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
# because we're exchanging big amounts of (audio) data with pipes
pipesize=1000000,
)
self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid)
- if isinstance(self._stdin, AsyncGenerator):
- self.attached_tasks.append(asyncio.create_task(self._feed_stdin()))
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
- while not self._close_called:
+ while True:
chunk = await self.readexactly(n)
if chunk == b"":
break
async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks as they come in from process stdout."""
- while not self._close_called:
+ while True:
chunk = await self.read(n)
if chunk == b"":
break
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
if self._close_called:
- raise RuntimeError("write called while process already done")
+ self.logger.warning("write called while process already done")
+ return
self.proc.stdin.write(data)
with suppress(BrokenPipeError, ConnectionResetError):
await self.proc.stdin.drain()
# already exited, race condition
pass
- async def close(self, send_signal: bool = False) -> int:
+ async def read_stderr(self) -> bytes:
+ """Read line from stderr."""
+ try:
+ return await self.proc.stderr.readline()
+ except ValueError as err:
+ # we're waiting for a line (separator found), but the line was too big
+ # this may happen with ffmpeg during a long (radio) stream where progress
+ # gets outputted to the stderr but no newline
+ # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
+ # NOTE: this consumes the line that was too big
+ if "chunk exceed the limit" in str(err):
+ return await self.proc.stderr.readline()
+ # raise for all other (value) errors
+ raise
+
+ async def iter_stderr(self) -> AsyncGenerator[str, None]:
+ """Iterate lines from the stderr stream as string."""
+ while True:
+ line = await self.read_stderr()
+ if line == b"":
+ break
+ line = line.decode().strip()
+ if not line:
+ continue
+ yield line
+
+ async def close(self, send_signal: bool = False) -> None:
"""Close/terminate the process and wait for exit."""
self._close_called = True
- # close any/all attached (writer) tasks
- for task in self.attached_tasks:
- if not task.done():
- task.cancel()
if send_signal and self.returncode is None:
self.proc.send_signal(SIGINT)
-
+ if self.proc.stdin and not self.proc.stdin.is_closing():
+ self.proc.stdin.close()
+ await asyncio.sleep(0) # yield to loop
# abort existing readers on stderr/stdout first before we send communicate
if self.proc.stdout and self.proc.stdout._waiter is not None:
- self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
- self.proc.stdout._waiter = None
+ with suppress(asyncio.exceptions.InvalidStateError):
+ self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
if self.proc.stderr and self.proc.stderr._waiter is not None:
- self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
- self.proc.stderr._waiter = None
+ with suppress(asyncio.exceptions.InvalidStateError):
+ self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
# make sure the process is really cleaned up.
# especially with pipes this can cause deadlocks if not properly guarded
# we need to ensure stdout and stderr are flushed and stdin closed
- while True:
+ while self.returncode is None:
try:
- async with asyncio.timeout(5):
- # use communicate to flush all pipe buffers
- await self.proc.communicate()
- if self.returncode is not None:
- break
+ # use communicate to flush all pipe buffers
+ await asyncio.wait_for(self.proc.communicate(), 5)
except TimeoutError:
self.logger.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
self.proc.pid,
self.returncode,
)
- return self.returncode
async def wait(self) -> int:
"""Wait for the process and return the returncode."""
self._returncode = await self.proc.wait()
return self._returncode
- async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
- """Write bytes to process and read back results."""
- stdout, stderr = await self.proc.communicate(input_data)
- self._returncode = self.proc.returncode
- return (stdout, stderr)
-
- async def _feed_stdin(self) -> None:
- """Feed stdin with chunks from an AsyncGenerator."""
- if TYPE_CHECKING:
- self._stdin: AsyncGenerator[bytes, None]
- try:
- async for chunk in self._stdin:
- if self._close_called or self.proc.stdin.is_closing():
- return
- await self.write(chunk)
- await self.write_eof()
- except Exception as err:
- if not isinstance(err, asyncio.CancelledError):
- self.logger.exception(err)
- # make sure the stdin generator is also properly closed
- # by propagating a cancellederror within
- task = asyncio.create_task(self._stdin.__anext__())
- task.cancel()
-
- async def read_stderr(self) -> bytes:
- """Read line from stderr."""
- try:
- return await self.proc.stderr.readline()
- except ValueError as err:
- # we're waiting for a line (separator found), but the line was too big
- # this may happen with ffmpeg during a long (radio) stream where progress
- # gets outputted to the stderr but no newline
- # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
- # NOTE: this consumes the line that was too big
- if "chunk exceed the limit" in str(err):
- return await self.proc.stderr.readline()
- # raise for all other (value) errors
- raise
-
- async def iter_stderr(self) -> AsyncGenerator[str, None]:
- """Iterate lines from the stderr stream as string."""
- while True:
- line = await self.read_stderr()
- if line == b"":
- break
- line = line.decode().strip()
- if not line:
- continue
- yield line
-
async def check_output(args: str | list[str]) -> tuple[int, bytes]:
- """Run subprocess and return output."""
+ """Run subprocess and return returncode and output."""
if isinstance(args, str):
proc = await asyncio.create_subprocess_shell(
args,
)
stdout, _ = await proc.communicate()
return (proc.returncode, stdout)
+
+
+async def communicate(
+ args: str | list[str],
+ input: bytes | None = None, # noqa: A002
+) -> tuple[int, bytes, bytes]:
+ """Communicate with subprocess and return returncode, stdout and stderr output."""
+ if isinstance(args, str):
+ proc = await asyncio.create_subprocess_shell(
+ args,
+ stderr=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ stdin=asyncio.subprocess.PIPE if input is not None else None,
+ )
+ else:
+ proc = await asyncio.create_subprocess_exec(
+ *args,
+ stderr=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ stdin=asyncio.subprocess.PIPE if input is not None else None,
+ )
+ stdout, stderr = await proc.communicate(input)
+ return (proc.returncode, stdout, stderr)
from __future__ import annotations
+import asyncio
from typing import TYPE_CHECKING
from music_assistant.common.models.enums import MediaType, ProviderFeature
library_item.item_id, prov_item
)
cur_db_ids.add(library_item.item_id)
+ await asyncio.sleep(0) # yield to eventloop
except MusicAssistantError as err:
self.logger.warning(
"Skipping sync of item %s - error details: %s", prov_item.uri, str(err)
else:
# otherwise: just unmark favorite
await controller.set_favorite(db_id, False)
+ await asyncio.sleep(0) # yield to eventloop
await self.mass.cache.set(cache_key, list(cur_db_ids))
# DO NOT OVERRIDE BELOW
CONF_ENTRY_ANNOUNCE_VOLUME_MIN,
CONF_ENTRY_ANNOUNCE_VOLUME_STRATEGY,
CONF_ENTRY_AUTO_PLAY,
+ CONF_ENTRY_FLOW_MODE,
CONF_ENTRY_HIDE_PLAYER,
CONF_ENTRY_PLAYER_ICON,
CONF_ENTRY_PLAYER_ICON_GROUP,
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
entries = (
CONF_ENTRY_PLAYER_ICON,
+ CONF_ENTRY_FLOW_MODE,
CONF_ENTRY_VOLUME_NORMALIZATION,
CONF_ENTRY_AUTO_PLAY,
CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import (
- get_ffmpeg_args,
- get_ffmpeg_stream,
- get_player_filter_params,
-)
+from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
self._started = asyncio.Event()
self._stopped = False
+ @property
+ def running(self) -> bool:
+ """Return boolean if this stream is running."""
+ return not self._stopped and self._started.is_set()
+
async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
"""Initialize CLIRaop process for a player."""
extra_args = []
elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
extra_args += ["-debug", "10"]
+ # create os pipes to pipe ffmpeg to cliraop
+ read, write = await asyncio.to_thread(os.pipe)
+
+ # ffmpeg handles the player specific stream + filters and pipes
+ # audio to the cliraop process
+ self._ffmpeg_proc = FFMpeg(
+ audio_input="-",
+ input_format=self.input_format,
+ output_format=AIRPLAY_PCM_FORMAT,
+ filter_params=get_player_filter_params(self.mass, player_id),
+ audio_output=write,
+ logger=self.airplay_player.logger.getChild("ffmpeg"),
+ )
+ await self._ffmpeg_proc.start()
+ await asyncio.to_thread(os.close, write)
+
+ # cliraop is the binary that handles the actual raop streaming to the player
cliraop_args = [
self.prov.cliraop_bin,
"-ntpstart",
self.airplay_player.address,
"-",
]
+ self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop")
if platform.system() == "Darwin":
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
-
- # ffmpeg handles the player specific stream + filters and pipes
- # audio to the cliraop process
- read, write = await asyncio.to_thread(os.pipe)
- ffmpeg_args = get_ffmpeg_args(
- input_format=self.input_format,
- output_format=AIRPLAY_PCM_FORMAT,
- filter_params=get_player_filter_params(self.mass, player_id),
- loglevel="fatal",
- )
- self._ffmpeg_proc = AsyncProcess(
- ffmpeg_args,
- stdin=True,
- stdout=write,
- name="cliraop_ffmpeg",
- )
- await self._ffmpeg_proc.start()
- await asyncio.to_thread(os.close, write)
-
- self._cliraop_proc = AsyncProcess(
- cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop"
- )
await self._cliraop_proc.start()
await asyncio.to_thread(os.close, read)
self._started.set()
logger.log(VERBOSE_LOG_LEVEL, line)
# if we reach this point, the process exited
- self.running = False
if airplay_player.active_stream == self:
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream:
tg.create_task(airplay_player.active_stream.stop())
+ if mass_player := self.mass.players.get(airplay_player.player_id):
+ mass_player.state = PlayerState.IDLE
+ self.mass.players.update(mass_player.player_id)
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
# get current ntp and start cliraop
_, stdout = await check_output(f"{self.cliraop_bin} -ntp")
start_ntp = int(stdout.strip())
- wait_start = 1000 + (500 * len(sync_clients))
+ wait_start = 1250 + (250 * len(sync_clients))
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
tg.create_task(airplay_player.active_stream.start(start_ntp, wait_start))
# so we debounce the resync a bit here with a timer
self.mass.call_later(
1,
- self.mass.player_queues.resume(active_queue.queue_id, fade_in=False),
+ self.mass.player_queues.resume,
+ active_queue.queue_id,
+ fade_in=False,
task_id=f"resume_{active_queue.queue_id}",
)
else:
async def check_binary(cliraop_path: str) -> str | None:
try:
- cliraop = await asyncio.create_subprocess_exec(
- *[cliraop_path, "-check"],
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
+ returncode, output = await check_output(
+ [cliraop_path, "-check"],
)
- stdout, _ = await cliraop.communicate()
- stdout = stdout.strip().decode()
- if cliraop.returncode == 0 and stdout == "cliraop check":
+ if returncode == 0 and output.strip().decode() == "cliraop check":
self.cliraop_bin = cliraop_path
return cliraop_path
except OSError:
CONF_ENTRY_CROSSFADE_DURATION,
)
-MASS_APP_ID = "46C1A819" # use the cast receiver app from philippe44 for now until we get our own
+MASS_APP_ID = "C35B0678"
# Monkey patch the Media controller here to store the queue items
castplayer.player.elapsed_time = status.current_time
# active source
- if (
- status.content_id
- and self.mass.streams.base_url in status.content_id
- and castplayer.player_id in status.content_id
- ):
+ if castplayer.cc.app_id == MASS_APP_ID:
castplayer.player.active_source = castplayer.player_id
else:
castplayer.player.active_source = castplayer.cc.app_display_name
provider=self.instance_id,
name=item.name,
)
+ await asyncio.sleep(0) # yield to eventloop
async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
"""Run library sync for this provider."""
if x.provider_instance == self.instance_id
)
prev_checksums[file_name] = db_item.metadata.checksum
+ await asyncio.sleep(0) # yield to eventloop
# process all deleted (or renamed) files first
cur_filenames = set()
# unsupported file extension
continue
cur_filenames.add(item.path)
+ await asyncio.sleep(0) # yield to eventloop
# work out deletions
deleted_files = set(prev_checksums.keys()) - cur_filenames
await self._process_deletions(deleted_files)
from __future__ import annotations
-import asyncio
import platform
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
from music_assistant.server.helpers.audio import get_file_stream
+from music_assistant.server.helpers.process import check_output
from music_assistant.server.providers.filesystem_local import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
LocalFileSystemProvider,
self.logger.info("Mounting //%s/%s%s to %s", server, share, subfolder, self.base_path)
self.logger.debug("Using mount command: %s", mount_cmd.replace(password, "########"))
- proc = await asyncio.create_subprocess_shell(
- mount_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
- )
- _, stderr = await proc.communicate()
- if proc.returncode != 0:
- msg = f"SMB mount failed with error: {stderr.decode()}"
+ returncode, output = await check_output(mount_cmd)
+ if returncode != 0:
+ msg = f"SMB mount failed with error: {output.decode()}"
raise LoginFailed(msg)
async def unmount(self, ignore_error: bool = False) -> None:
"""Unmount the remote share."""
- proc = await asyncio.create_subprocess_shell(
- f"umount {self.base_path}",
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- _, stderr = await proc.communicate()
- if proc.returncode != 0 and not ignore_error:
- self.logger.warning("SMB unmount failed with error: %s", stderr.decode())
+ returncode, output = await check_output(f"umount {self.base_path}")
+ if returncode != 0 and not ignore_error:
+ self.logger.warning("SMB unmount failed with error: %s", output.decode())
# so we debounce the resync a bit here with a timer
self.mass.call_later(
1,
- self.mass.player_queues.resume(active_queue.queue_id, fade_in=False),
+ self.mass.player_queues.resume,
+ active_queue.queue_id,
+ fade_in=False,
task_id=f"resume_{active_queue.queue_id}",
)
else:
input_format=input_format,
output_format=DEFAULT_SNAPCAST_FORMAT,
filter_params=get_player_filter_params(self.mass, player_id),
- name="snapcast_ffmpeg",
audio_output=f"tcp://{host}:{port}",
+ logger=self.logger.getChild("ffmpeg"),
) as ffmpeg_proc:
await ffmpeg_proc.wait()
# we need to wait a bit for the stream status to become idle
"--tcp.enabled=true",
"--tcp.port=1705",
]
- async with AsyncProcess(args, stdin=False, stdout=True, stderr=False) as snapserver_proc:
+ async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc:
# keep reading from stdout until exit
async for data in snapserver_proc.iter_any():
data = data.decode().strip() # noqa: PLW2901
from music_assistant.server.helpers.app_vars import app_var
# pylint: enable=no-name-in-module
-from music_assistant.server.helpers.process import AsyncProcess
+from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.music_provider import MusicProvider
if TYPE_CHECKING:
if self._ap_workaround:
args += ["--ap-port", "12345"]
bytes_sent = 0
- async with AsyncProcess(args, stdout=True) as librespot_proc:
+ async with AsyncProcess(args, stdout=True, name="librespot") as librespot_proc:
async for chunk in librespot_proc.iter_any():
yield chunk
bytes_sent += len(chunk)
]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- async with AsyncProcess(args, stdout=True) as librespot:
- stdout = await librespot.read(-1)
- if stdout.decode().strip() != "authorized":
+ _returncode, output = await check_output(args)
+ if _returncode == 0 and output.decode().strip() != "authorized":
raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}")
# get token with (authorized) librespot
scopes = [
]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- async with AsyncProcess(args, stdout=True) as librespot:
- stdout = await librespot.read(-1)
+ _returncode, output = await check_output(args)
duration = round(time.time() - time_start, 2)
try:
- result = json.loads(stdout)
+ result = json.loads(output)
except JSONDecodeError:
self.logger.warning(
"Error while retrieving Spotify token after %s seconds, details: %s",
duration,
- stdout.decode(),
+ output.decode(),
)
return None
self.logger.debug(
async def check_librespot(librespot_path: str) -> str | None:
try:
- librespot = await asyncio.create_subprocess_exec(
- *[librespot_path, "--check"], stdout=asyncio.subprocess.PIPE
- )
- stdout, _ = await librespot.communicate()
- if (
- librespot.returncode == 0
- and b"ok spotty" in stdout
- and b"using librespot" in stdout
- ):
+ returncode, output = await check_output([librespot_path, "--check"])
+ if returncode == 0 and b"ok spotty" in output and b"using librespot" in output:
self._librespot_bin = librespot_path
return librespot_path
except OSError: