PLAYER_QUEUE_STATE = 7
MEDIA_INFO = 8
LIBRARY_ITEMS = 9
+
+
+class VolumeNormalizationMode(StrEnum):
+ """Enum with possible VolumeNormalization modes."""
+
+ DISABLED = "disabled"
+ DYNAMIC = "dynamic"
+ MEASUREMENT_ONLY = "measurement_only"
+ FALLBACK_FIXED_GAIN = "fallback_fixed_gain"
+ FIXED_GAIN = "fixed_gain"
+ FALLBACK_DYNAMIC = "fallback_dynamic"
from mashumaro import DataClassDictMixin
-from music_assistant.common.models.enums import MediaType, StreamType
+from music_assistant.common.models.enums import MediaType, StreamType, VolumeNormalizationMode
from music_assistant.common.models.media_items import AudioFormat
# the fields below will be set/controlled by the streamcontroller
seek_position: int = 0
fade_in: bool = False
- enable_volume_normalization: bool = False
loudness: float | None = None
loudness_album: float | None = None
prefer_album_loudness: bool = False
- force_dynamic_volume_normalization: bool = False
+ volume_normalization_mode: VolumeNormalizationMode | None = None
queue_id: str | None = None
seconds_streamed: float | None = None
target_loudness: float | None = None
CONF_SYNCGROUP_DEFAULT_ON: Final[str] = "syncgroup_default_on"
CONF_ENABLE_ICY_METADATA: Final[str] = "enable_icy_metadata"
CONF_VOLUME_NORMALIZATION_RADIO: Final[str] = "volume_normalization_radio"
+CONF_VOLUME_NORMALIZATION_TRACKS: Final[str] = "volume_normalization_tracks"
+CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO: Final[str] = "volume_normalization_fixed_gain_radio"
+CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS: Final[str] = "volume_normalization_fixed_gain_tracks"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
current_index: int | None
elapsed_time: int
stream_title: str | None
+ content_type: str | None
class PlayerQueuesController(CoreController):
while True:
try:
if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
- await self.play_index(queue_id, next_index)
+ await self.play_index(queue_id, next_index, debounce=True)
break
except MediaNotFoundError:
self.logger.warning(
current_index = self._queues[queue_id].current_index
if current_index is None:
return
- await self.play_index(queue_id, max(current_index - 1, 0))
+ await self.play_index(queue_id, max(current_index - 1, 0), debounce=True)
@api_command("player_queues/skip")
async def skip(self, queue_id: str, seconds: int = 10) -> None:
index: int | str,
seek_position: int = 0,
fade_in: bool = False,
+ debounce: bool = False,
) -> None:
"""Play item at index (or item_id) X in queue."""
queue = self._queues[queue_id]
queue.stream_finished = False
queue.end_of_track_reached = False
+ queue.current_item = queue_item
+ self.signal_update(queue_id)
+
# work out if we are playing an album and if we should prefer album loudness
if (
next_index is not None
# NOTE that we debounce this a bit to account for someone hitting the next button
# like a madman. This will prevent the player from being overloaded with requests.
self.mass.call_later(
- 0.25,
+ 1 if debounce else 0.1,
self.mass.players.play_media,
player_id=queue_id,
# transform into PlayerMedia to send to the actual player implementation
media=self.player_media_from_queue_item(queue_item, queue.flow_mode),
task_id=f"play_media_{queue_id}",
)
+ self.signal_update(queue_id)
@api_command("player_queues/transfer")
async def transfer_queue(
stream_title=queue.current_item.streamdetails.stream_title
if queue.current_item and queue.current_item.streamdetails
else None,
+ content_type=queue.current_item.streamdetails.audio_format.output_format_str
+ if queue.current_item and queue.current_item.streamdetails
+ else None,
)
changed_keys = get_changed_keys(prev_state, new_state)
# return early if nothing changed
# enqueue the next track as soon as the player reports
# it has started buffering the given queue item
task_id = f"enqueue_next_{queue_id}"
- self.mass.call_later(0.2, self._enqueue_next, queue, item_id, task_id=task_id)
- # we repeat this task once more after 2 seconds to ensure the player
- # received the command as it may be missed at the first attempt
- # due to a race condition
self.mass.call_later(2, self._enqueue_next, queue, item_id, task_id=task_id)
# Main queue manipulation methods
if queue.index_in_buffer is not None:
task_id = f"enqueue_next_{queue.queue_id}"
self.mass.call_later(
- 1, self._enqueue_next, queue, queue.index_in_buffer, task_id=task_id
+ 5, self._enqueue_next, queue, queue.index_in_buffer, task_id=task_id
)
# always send the base event
ConfigValueOption,
ConfigValueType,
)
-from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType, StreamType
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ ContentType,
+ MediaType,
+ StreamType,
+ VolumeNormalizationMode,
+)
from music_assistant.common.models.errors import QueueEmpty
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.streamdetails import StreamDetails
CONF_PUBLISH_IP,
CONF_SAMPLE_RATES,
CONF_VOLUME_NORMALIZATION,
+ CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
+ CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
CONF_VOLUME_NORMALIZATION_RADIO,
+ CONF_VOLUME_NORMALIZATION_TRACKS,
MASS_LOGO_ONLINE,
SILENCE_FILE,
VERBOSE_LOG_LEVEL,
check_audio_support,
crossfade_pcm_parts,
get_chunksize,
- get_ffmpeg_stream,
get_hls_radio_stream,
get_hls_substream,
get_icy_radio_stream,
get_silence,
get_stream_details,
)
+from music_assistant.server.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
+from music_assistant.server.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
from music_assistant.server.models.core_controller import CoreController
DEFAULT_STREAM_HEADERS = {
+ "Server": "Music Assistant",
"transferMode.dlna.org": "Streaming",
"contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000", # noqa: E501
"Cache-Control": "no-cache,must-revalidate",
"Pragma": "no-cache",
- "Connection": "close",
"Accept-Ranges": "none",
+ "Connection": "close",
}
ICY_HEADERS = {
"icy-name": "Music Assistant",
"Make sure that this server can be reached "
"on the given IP and TCP port by players on the local network.",
),
+ ConfigEntry(
+ key=CONF_VOLUME_NORMALIZATION_RADIO,
+ type=ConfigEntryType.STRING,
+ default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
+ label="Volume normalization method for radio streams",
+ options=(
+ ConfigValueOption(x.value.replace("_", " ").title(), x.value)
+ for x in VolumeNormalizationMode
+ ),
+ category="audio",
+ ),
+ ConfigEntry(
+ key=CONF_VOLUME_NORMALIZATION_TRACKS,
+ type=ConfigEntryType.STRING,
+ default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
+ label="Volume normalization method for tracks",
+ options=(
+ ConfigValueOption(x.value.replace("_", " ").title(), x.value)
+ for x in VolumeNormalizationMode
+ ),
+ category="audio",
+ ),
+ ConfigEntry(
+ key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
+ type=ConfigEntryType.FLOAT,
+ range=(-20, 10),
+ default_value=-6,
+ label="Fixed/fallback gain adjustment for radio streams",
+ category="audio",
+ ),
+ ConfigEntry(
+ key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
+ type=ConfigEntryType.FLOAT,
+ range=(-20, 10),
+ default_value=-6,
+ label="Fixed/fallback gain adjustment for tracks",
+ category="audio",
+ ),
ConfigEntry(
key=CONF_PUBLISH_IP,
type=ConfigEntryType.STRING,
"not be adjusted in regular setups.",
category="advanced",
),
- ConfigEntry(
- key=CONF_VOLUME_NORMALIZATION_RADIO,
- type=ConfigEntryType.STRING,
- default_value="standard",
- label="Volume normalization method to use for radio streams",
- description="Radio streams often have varying loudness levels, especially "
- "during announcements and commercials. \n"
- "You can choose to enforce dynamic volume normalization to radio streams, "
- "even if a (average) loudness measurement for the radio station exists. \n\n"
- "Options: \n"
- "- Disabled - do not apply volume normalization at all \n"
- "- Force dynamic - Enforce dynamic volume levelling at all times \n"
- "- Standard - use normalization based on previous measurement, ",
- options=(
- ConfigValueOption("Disabled", "disabled"),
- ConfigValueOption("Force dynamic", "dynamic"),
- ConfigValueOption("Standard", "standard"),
- ),
- category="advanced",
- ),
)
async def setup(self, config: CoreConfig) -> None:
version,
"with libsoxr support" if libsoxr_support else "",
)
- # copy log level to audio module
+ # copy log level to audio/ffmpeg loggers
AUDIO_LOGGER.setLevel(self.logger.level)
+ FFMPEG_LOGGER.setLevel(self.logger.level)
# start the webserver
self.publish_port = config.get_value(CONF_BIND_PORT)
self.publish_ip = config.get_value(CONF_PUBLISH_IP)
headers = {
**DEFAULT_STREAM_HEADERS,
"Content-Type": f"audio/{output_format.output_format_str}",
- "Accept-Ranges": "none",
- "Cache-Control": "no-cache",
- "Connection": "close",
"icy-name": queue_item.name,
}
resp = web.StreamResponse(
filter_params = []
extra_input_args = []
# handle volume normalization
- if streamdetails.enable_volume_normalization and streamdetails.target_loudness is not None:
- if streamdetails.force_dynamic_volume_normalization or streamdetails.loudness is None:
- # volume normalization with unknown loudness measurement
- # use loudnorm filter in dynamic mode
- # which also collects the measurement on the fly during playback
- # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
- filter_rule = (
- f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
- )
- filter_rule += ":print_format=json"
- filter_params.append(filter_rule)
- else:
- # volume normalization with known loudness measurement
- # apply fixed volume/gain correction
- gain_correct = streamdetails.target_loudness - streamdetails.loudness
- gain_correct = round(gain_correct, 2)
- filter_params.append(f"volume={gain_correct}dB")
+ enable_volume_normalization = (
+ streamdetails.target_loudness is not None
+ and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
+ )
+ dynamic_volume_normalization = (
+ streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
+ and enable_volume_normalization
+ )
+ if dynamic_volume_normalization:
+ # volume normalization using loudnorm filter (in dynamic mode)
+ # which also collects the measurement on the fly during playback
+ # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
+ filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
+ filter_rule += ":print_format=json"
+ filter_params.append(filter_rule)
+ elif (
+ enable_volume_normalization
+ and streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN
+ ):
+ # apply used defined fixed volume/gain correction
+ gain_correct: float = await self.mass.config.get_core_config_value(
+ CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
+ if streamdetails.media_type == MediaType.RADIO
+ else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
+ )
+ gain_correct = round(gain_correct, 2)
+ filter_params.append(f"volume={gain_correct}dB")
+ elif enable_volume_normalization and streamdetails.loudness is not None:
+ # volume normalization with known loudness measurement
+ # apply volume/gain correction
+ gain_correct = streamdetails.target_loudness - streamdetails.loudness
+ gain_correct = round(gain_correct, 2)
+ filter_params.append(f"volume={gain_correct}dB")
+
+ # work out audio source for these streamdetails
if streamdetails.stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
if streamdetails.media_type == MediaType.RADIO:
# pad some silence before the radio stream starts to create some headroom
# for radio stations that do not provide any look ahead buffer
- # without this, some radio streams jitter a lot
- async for chunk in get_silence(2, pcm_format):
+ # without this, some radio streams jitter a lot, especially with dynamic normalization
+ pad_seconds = 5 if dynamic_volume_normalization else 2
+ async for chunk in get_silence(pad_seconds, pcm_format):
yield chunk
async for chunk in get_media_stream(
import time
from collections import deque
from collections.abc import AsyncGenerator
-from contextlib import suppress
from io import BytesIO
-from signal import SIGINT
from typing import TYPE_CHECKING
import aiofiles
from aiohttp import ClientTimeout
-from music_assistant.common.helpers.global_cache import (
- get_global_cache_value,
- set_global_cache_values,
-)
+from music_assistant.common.helpers.global_cache import set_global_cache_values
from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
from music_assistant.common.helpers.util import clean_stream_title
-from music_assistant.common.models.enums import MediaType, StreamType
+from music_assistant.common.models.config_entries import CoreConfig, PlayerConfig
+from music_assistant.common.models.enums import MediaType, StreamType, VolumeNormalizationMode
from music_assistant.common.models.errors import (
- AudioError,
InvalidDataError,
MediaNotFoundError,
MusicAssistantError,
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_RADIO,
CONF_VOLUME_NORMALIZATION_TARGET,
+ CONF_VOLUME_NORMALIZATION_TRACKS,
MASS_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
)
-from music_assistant.server.helpers.playlists import (
- HLS_CONTENT_TYPES,
- IsHLSPlaylist,
- PlaylistItem,
- fetch_playlist,
- parse_m3u,
-)
-from music_assistant.server.helpers.tags import parse_tags
-from music_assistant.server.helpers.throttle_retry import BYPASS_THROTTLER
+from .ffmpeg import FFMpeg, get_ffmpeg_stream
+from .playlists import HLS_CONTENT_TYPES, IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, check_output, communicate
+from .tags import parse_tags
+from .throttle_retry import BYPASS_THROTTLER
from .util import create_tempfile
if TYPE_CHECKING:
HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
-class FFMpeg(AsyncProcess):
- """FFMpeg wrapped as AsyncProcess."""
-
- def __init__(
- self,
- audio_input: AsyncGenerator[bytes, None] | str | int,
- input_format: AudioFormat,
- output_format: AudioFormat,
- filter_params: list[str] | None = None,
- extra_args: list[str] | None = None,
- extra_input_args: list[str] | None = None,
- audio_output: str | int = "-",
- collect_log_history: bool = False,
- logger: logging.Logger | None = None,
- ) -> None:
- """Initialize AsyncProcess."""
- ffmpeg_args = get_ffmpeg_args(
- input_format=input_format,
- output_format=output_format,
- filter_params=filter_params or [],
- extra_args=extra_args or [],
- input_path=audio_input if isinstance(audio_input, str) else "-",
- output_path=audio_output if isinstance(audio_output, str) else "-",
- extra_input_args=extra_input_args or [],
- loglevel="info",
- )
- self.audio_input = audio_input
- self.input_format = input_format
- self.collect_log_history = collect_log_history
- self.log_history: deque[str] = deque(maxlen=100)
- self._stdin_task: asyncio.Task | None = None
- self._logger_task: asyncio.Task | None = None
- super().__init__(
- ffmpeg_args,
- stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
- stdout=True if isinstance(audio_output, str) else audio_output,
- stderr=True,
- )
- self.logger = logger or LOGGER.getChild("ffmpeg")
- clean_args = []
- for arg in ffmpeg_args[1:]:
- if arg.startswith("http"):
- clean_args.append("<URL>")
- elif "/" in arg and "." in arg:
- clean_args.append("<FILE>")
- else:
- clean_args.append(arg)
- args_str = " ".join(clean_args)
- self.logger.log(VERBOSE_LOG_LEVEL, "starting ffmpeg with args: %s", args_str)
-
- async def start(self) -> None:
- """Perform Async init of process."""
- await super().start()
- self._logger_task = asyncio.create_task(self._log_reader_task())
- if isinstance(self.audio_input, AsyncGenerator):
- self._stdin_task = asyncio.create_task(self._feed_stdin())
-
- async def close(self, send_signal: bool = True) -> None:
- """Close/terminate the process and wait for exit."""
- if self._stdin_task and not self._stdin_task.done():
- self._stdin_task.cancel()
- if not self.collect_log_history:
- await super().close(send_signal)
- return
- # override close logic to make sure we catch all logging
- self._close_called = True
- if send_signal and self.returncode is None:
- self.proc.send_signal(SIGINT)
- if self.proc.stdin and not self.proc.stdin.is_closing():
- self.proc.stdin.close()
- await asyncio.sleep(0) # yield to loop
- # abort existing readers on stdout first before we send communicate
- waiter: asyncio.Future
- if self.proc.stdout and (waiter := self.proc.stdout._waiter):
- self.proc.stdout._waiter = None
- if waiter and not waiter.done():
- waiter.set_exception(asyncio.CancelledError())
- # read remaining bytes to unblock pipe
- await self.read(-1)
- # wait for log task to complete that reads the remaining data from stderr
- with suppress(TimeoutError):
- await asyncio.wait_for(self._logger_task, 5)
- await super().close(False)
-
- async def _log_reader_task(self) -> None:
- """Read ffmpeg log from stderr."""
- decode_errors = 0
- async for line in self.iter_stderr():
- if self.collect_log_history:
- self.log_history.append(line)
- if "error" in line or "warning" in line:
- self.logger.debug(line)
- elif "critical" in line:
- self.logger.warning(line)
- else:
- self.logger.log(VERBOSE_LOG_LEVEL, line)
-
- if "Invalid data found when processing input" in line:
- decode_errors += 1
- if decode_errors >= 50:
- self.logger.error(line)
- await super().close(True)
-
- # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
- if line.startswith("Stream #") and ": Audio: " in line:
- if self.input_format.content_type == ContentType.UNKNOWN:
- content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
- content_type = ContentType.try_parse(content_type_raw)
- self.logger.debug(
- "Detected (input) content type: %s (%s)", content_type, content_type_raw
- )
- self.input_format.content_type = content_type
- del line
-
- async def _feed_stdin(self) -> None:
- """Feed stdin with audio chunks from an AsyncGenerator."""
- if TYPE_CHECKING:
- self.audio_input: AsyncGenerator[bytes, None]
- try:
- async for chunk in self.audio_input:
- await self.write(chunk)
- # write EOF once we've reached the end of the input stream
- await self.write_eof()
- except Exception as err:
- if isinstance(err, asyncio.CancelledError):
- return
- # make sure we dont swallow any exceptions and we bail out
- # once our audio source fails.
- self.logger.error(
- "Stream error: %s",
- str(err),
- exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
- )
- await self.write_eof()
-
-
async def crossfade_pcm_parts(
fade_in_part: bytes,
fade_out_part: bytes,
streamdetails.fade_in = fade_in
if not streamdetails.duration:
streamdetails.duration = queue_item.duration
+
# handle volume normalization details
if result := await mass.music.get_loudness(
streamdetails.item_id,
streamdetails.loudness, streamdetails.loudness_album = result
streamdetails.prefer_album_loudness = prefer_album_loudness
player_settings = await mass.config.get_player_config(streamdetails.queue_id)
- streamdetails.enable_volume_normalization = player_settings.get_value(CONF_VOLUME_NORMALIZATION)
- streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
-
- radio_norm_pref = await mass.config.get_core_config_value(
- "streams", CONF_VOLUME_NORMALIZATION_RADIO
+ core_config = await mass.config.get_core_config("streams")
+ streamdetails.volume_normalization_mode = _get_normalization_mode(
+ core_config, player_settings, streamdetails
)
- if streamdetails.media_type == MediaType.RADIO and radio_norm_pref == "disabled":
- streamdetails.enable_volume_normalization = False
- elif streamdetails.media_type == MediaType.RADIO and radio_norm_pref == "dynamic":
- streamdetails.force_dynamic_volume_normalization = True
+ streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
process_time = int((time.time() - time_start) * 1000)
LOGGER.debug("retrieved streamdetails for %s in %s milliseconds", queue_item.uri, process_time)
chunk_number = 0
buffer: bytes = b""
finished = False
+
+ ffmpeg_proc = FFMpeg(
+ audio_input=audio_source,
+ input_format=streamdetails.audio_format,
+ output_format=pcm_format,
+ filter_params=filter_params,
+ extra_input_args=extra_input_args,
+ collect_log_history=True,
+ )
try:
- async with FFMpeg(
- audio_input=audio_source,
- input_format=streamdetails.audio_format,
- output_format=pcm_format,
- filter_params=filter_params,
- extra_input_args=extra_input_args,
- collect_log_history=True,
- logger=logger,
- ) as ffmpeg_proc:
- async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
- # for radio streams we just yield all chunks directly
- if streamdetails.media_type == MediaType.RADIO:
- yield chunk
- bytes_sent += len(chunk)
- continue
+ await ffmpeg_proc.start()
+ async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
+ # for radio streams we just yield all chunks directly
+ if streamdetails.media_type == MediaType.RADIO:
+ yield chunk
+ bytes_sent += len(chunk)
+ continue
- chunk_number += 1
- # determine buffer size dynamically
- if chunk_number < 5 and strip_silence_begin:
- req_buffer_size = int(pcm_format.pcm_sample_size * 4)
- elif chunk_number > 30 and strip_silence_end:
- req_buffer_size = int(pcm_format.pcm_sample_size * 8)
- else:
- req_buffer_size = int(pcm_format.pcm_sample_size * 2)
-
- # always append to buffer
- buffer += chunk
- del chunk
-
- if len(buffer) < req_buffer_size:
- # buffer is not full enough, move on
- continue
+ chunk_number += 1
+ # determine buffer size dynamically
+ if chunk_number < 5 and strip_silence_begin:
+ req_buffer_size = int(pcm_format.pcm_sample_size * 4)
+ elif chunk_number > 30 and strip_silence_end:
+ req_buffer_size = int(pcm_format.pcm_sample_size * 8)
+ else:
+ req_buffer_size = int(pcm_format.pcm_sample_size * 2)
- if chunk_number == 5 and strip_silence_begin:
- # strip silence from begin of audio
- chunk = await strip_silence( # noqa: PLW2901
- mass, buffer, pcm_format=pcm_format
- )
- bytes_sent += len(chunk)
- yield chunk
- buffer = b""
- continue
+ # always append to buffer
+ buffer += chunk
+ del chunk
+
+ if len(buffer) < req_buffer_size:
+ # buffer is not full enough, move on
+ continue
- #### OTHER: enough data in buffer, feed to output
- while len(buffer) > req_buffer_size:
- yield buffer[: pcm_format.pcm_sample_size]
- bytes_sent += pcm_format.pcm_sample_size
- buffer = buffer[pcm_format.pcm_sample_size :]
-
- # end of audio/track reached
- if strip_silence_end and buffer:
- # strip silence from end of audio
- buffer = await strip_silence(
- mass,
- buffer,
- pcm_format=pcm_format,
- reverse=True,
+ if chunk_number == 5 and strip_silence_begin:
+ # strip silence from begin of audio
+ chunk = await strip_silence( # noqa: PLW2901
+ mass, buffer, pcm_format=pcm_format
)
- # send remaining bytes in buffer
- bytes_sent += len(buffer)
- yield buffer
- del buffer
-
- if bytes_sent == 0:
- # edge case: no audio data was sent
- streamdetails.stream_error = True
- finished = False
- logger.warning("Stream error on %s", streamdetails.uri)
- # we send a bit of silence so players get at least some data
- # without it, some players refuse to skip to the next track
- async for chunk in get_silence(6, pcm_format):
- yield chunk
- bytes_sent += len(chunk)
- else:
- finished = True
+ bytes_sent += len(chunk)
+ yield chunk
+ buffer = b""
+ continue
+
+ #### OTHER: enough data in buffer, feed to output
+ while len(buffer) > req_buffer_size:
+ yield buffer[: pcm_format.pcm_sample_size]
+ bytes_sent += pcm_format.pcm_sample_size
+ buffer = buffer[pcm_format.pcm_sample_size :]
+
+ # end of audio/track reached
+ if strip_silence_end and buffer:
+ # strip silence from end of audio
+ buffer = await strip_silence(
+ mass,
+ buffer,
+ pcm_format=pcm_format,
+ reverse=True,
+ )
+ # send remaining bytes in buffer
+ bytes_sent += len(buffer)
+ yield buffer
+ del buffer
+
+ if bytes_sent == 0:
+ # edge case: no audio data was sent
+ streamdetails.stream_error = True
+ finished = False
+ logger.warning("Stream error on %s", streamdetails.uri)
+ # we send a bit of silence so players get at least some data
+ # without it, some players refuse to skip to the next track
+ async for chunk in get_silence(6, pcm_format):
+ yield chunk
+ bytes_sent += len(chunk)
+ else:
+ finished = True
finally:
- if "ffmpeg_proc" not in locals():
- # edge case: ffmpeg process was not yet started
- return # noqa: B012
- if finished and not ffmpeg_proc.closed:
- await asyncio.wait_for(ffmpeg_proc.wait(), 60)
- elif not ffmpeg_proc.closed:
- await ffmpeg_proc.close()
+ await ffmpeg_proc.close()
# try to determine how many seconds we've streamed
seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
streamdetails.duration = seconds_streamed
# parse loudnorm data if we have that collected
- required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
- if streamdetails.loudness is None and (finished or (seconds_streamed >= required_seconds)):
- loudness_details = parse_loudnorm(" ".join(ffmpeg_proc.log_history))
- if loudness_details is not None:
+ if (
+ streamdetails.loudness is None
+ and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
+ and (finished or (seconds_streamed >= 60))
+ ):
+ # if dynamic volume normalization is enabled and the entire track is streamed
+ # the loudnorm filter will output the measuremeet in the log,
+ # so we can use those directly instead of analyzing the audio
+ if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
logger.debug(
"Loudness measurement for %s: %s dB",
streamdetails.uri,
media_type=streamdetails.media_type,
)
)
+ else:
+ # no data from loudnorm filter found, we need to analyze the audio
+ # add background task to start analyzing the audio
+ task_id = f"analyze_loudness_{streamdetails.uri}"
+ mass.create_task(analyze_loudness, mass, streamdetails, task_id=task_id)
+
# report playback
if finished or seconds_streamed > 30:
mass.create_task(
yield data
-async def get_ffmpeg_stream(
- audio_input: AsyncGenerator[bytes, None] | str,
- input_format: AudioFormat,
- output_format: AudioFormat,
- filter_params: list[str] | None = None,
- extra_args: list[str] | None = None,
- chunk_size: int | None = None,
- extra_input_args: list[str] | None = None,
- logger: logging.Logger | None = None,
-) -> AsyncGenerator[bytes, None]:
- """
- Get the ffmpeg audio stream as async generator.
-
- Takes care of resampling and/or recoding if needed,
- according to player preferences.
- """
- async with FFMpeg(
- audio_input=audio_input,
- input_format=input_format,
- output_format=output_format,
- filter_params=filter_params,
- extra_args=extra_args,
- extra_input_args=extra_input_args,
- logger=logger,
- ) as ffmpeg_proc:
- # read final chunks from stdout
- iterator = (
- ffmpeg_proc.iter_chunked(chunk_size)
- if chunk_size
- else ffmpeg_proc.iter_any(get_chunksize(output_format))
- )
- async for chunk in iterator:
- yield chunk
-
-
async def check_audio_support() -> tuple[bool, bool, str]:
"""Check if ffmpeg is present (with/without libsoxr support)."""
# check for FFmpeg presence
if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
return pcm_size
if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
- return int(pcm_size * 0.5)
+ return int(pcm_size * 0.8)
if fmt.content_type in (ContentType.MP3, ContentType.OGG):
return int((320000 / 8) * seconds)
if fmt.content_type in (ContentType.AAC, ContentType.M4A):
return filter_params
-def get_ffmpeg_args(
- input_format: AudioFormat,
- output_format: AudioFormat,
- filter_params: list[str],
- extra_args: list[str] | None = None,
- input_path: str = "-",
- output_path: str = "-",
- extra_input_args: list[str] | None = None,
- loglevel: str = "error",
-) -> list[str]:
- """Collect all args to send to the ffmpeg process."""
- if extra_args is None:
- extra_args = []
- ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
- if not ffmpeg_present:
- msg = (
- "FFmpeg binary is missing from system."
- "Please install ffmpeg on your OS to enable playback."
- )
- raise AudioError(
- msg,
- )
-
- major_version = int("".join(char for char in version.split(".")[0] if not char.isalpha()))
-
- # generic args
- generic_args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- loglevel,
- "-nostats",
- "-ignore_unknown",
- "-protocol_whitelist",
- "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
- ]
- # collect input args
- input_args = []
- if extra_input_args:
- input_args += extra_input_args
- if input_path.startswith("http"):
- # append reconnect options for direct stream from http
- input_args += [
- "-reconnect",
- "1",
- "-reconnect_streamed",
- "1",
- ]
- if major_version > 4:
- # these options are only supported in ffmpeg > 5
- input_args += [
- "-reconnect_on_network_error",
- "1",
- "-reconnect_on_http_error",
- "5xx,4xx",
- ]
- if input_format.content_type.is_pcm():
- input_args += [
- "-ac",
- str(input_format.channels),
- "-channel_layout",
- "mono" if input_format.channels == 1 else "stereo",
- "-ar",
- str(input_format.sample_rate),
- "-acodec",
- input_format.content_type.name.lower(),
- "-f",
- input_format.content_type.value,
- "-i",
- input_path,
- ]
- else:
- # let ffmpeg auto detect the content type from the metadata/headers
- input_args += ["-i", input_path]
-
- # collect output args
- output_args = []
- if output_path.upper() == "NULL":
- # devnull stream
- output_args = ["-f", "null", "-"]
- elif output_format.content_type == ContentType.UNKNOWN:
- raise RuntimeError("Invalid output format specified")
- elif output_format.content_type == ContentType.AAC:
- output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k", output_path]
- elif output_format.content_type == ContentType.MP3:
- output_args = ["-f", "mp3", "-b:a", "320k", output_path]
- else:
- if output_format.content_type.is_pcm():
- output_args += ["-acodec", output_format.content_type.name.lower()]
- # use explicit format identifier for all other
- output_args += [
- "-f",
- output_format.content_type.value,
- "-ar",
- str(output_format.sample_rate),
- "-ac",
- str(output_format.channels),
- ]
- if output_format.output_format_str == "flac":
- output_args += ["-compression_level", "6"]
- output_args += [output_path]
-
- # edge case: source file is not stereo - downmix to stereo
- if input_format.channels > 2 and output_format.channels == 2:
- filter_params = [
- "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE",
- *filter_params,
- ]
-
- # determine if we need to do resampling
- if (
- input_format.sample_rate != output_format.sample_rate
- or input_format.bit_depth > output_format.bit_depth
- ):
- # prefer resampling with libsoxr due to its high quality
- if libsoxr_support:
- resample_filter = "aresample=resampler=soxr:precision=30"
- else:
- resample_filter = "aresample=resampler=swr"
-
- # sample rate conversion
- if input_format.sample_rate != output_format.sample_rate:
- resample_filter += f":osr={output_format.sample_rate}"
-
- # bit depth conversion: apply dithering when going down to 16 bits
- if output_format.bit_depth == 16 and input_format.bit_depth > 16:
- resample_filter += ":osf=s16:dither_method=triangular_hp"
-
- filter_params.append(resample_filter)
-
- if filter_params and "-filter_complex" not in extra_args:
- extra_args += ["-af", ",".join(filter_params)]
-
- return generic_args + input_args + extra_args + output_args
-
-
def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
"""Parse Loudness measurement from ffmpeg stderr output."""
stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
except JSON_DECODE_EXCEPTIONS:
return None
return float(loudness_data["input_i"])
+
+
+async def analyze_loudness(
+ mass: MusicAssistant,
+ streamdetails: StreamDetails,
+) -> None:
+ """Analyze media item's audio, to calculate EBU R128 loudness."""
+ if result := await mass.music.get_loudness(
+ streamdetails.item_id,
+ streamdetails.provider,
+ media_type=streamdetails.media_type,
+ ):
+ # only when needed we do the analyze job
+ streamdetails.loudness = result
+ return
+
+ logger = LOGGER.getChild("analyze_loudness")
+ logger.debug("Start analyzing audio for %s", streamdetails.uri)
+
+ extra_input_args = [
+ # limit to 10 minutes to reading too much in memory
+ "-t",
+ "600",
+ ]
+ if streamdetails.stream_type == StreamType.CUSTOM:
+ audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
+ streamdetails,
+ )
+ elif streamdetails.stream_type == StreamType.HLS:
+ substream = await get_hls_substream(mass, streamdetails.path)
+ audio_source = substream.path
+ elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
+ audio_source = streamdetails.path
+ extra_input_args += ["-decryption_key", streamdetails.decryption_key]
+ else:
+ audio_source = streamdetails.path
+
+ # calculate BS.1770 R128 integrated loudness with ffmpeg
+ async with FFMpeg(
+ audio_input=audio_source,
+ input_format=streamdetails.audio_format,
+ output_format=streamdetails.audio_format,
+ audio_output="NULL",
+ filter_params=["ebur128=framelog=verbose"],
+ extra_input_args=extra_input_args,
+ collect_log_history=True,
+ ) as ffmpeg_proc:
+ await ffmpeg_proc.wait()
+ log_lines = ffmpeg_proc.log_history
+ log_lines_str = "\n".join(log_lines)
+ try:
+ loudness_str = (
+ log_lines_str.split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0]
+ )
+ loudness = float(loudness_str.strip())
+ except (IndexError, ValueError, AttributeError):
+ LOGGER.warning(
+ "Could not determine integrated loudness of %s - %s",
+ streamdetails.uri,
+ log_lines_str or "received empty value",
+ )
+ else:
+ streamdetails.loudness = loudness
+ await mass.music.set_loudness(
+ streamdetails.item_id,
+ streamdetails.provider,
+ loudness,
+ media_type=streamdetails.media_type,
+ )
+ logger.debug(
+ "Integrated loudness of %s is: %s",
+ streamdetails.uri,
+ loudness,
+ )
+
+
+def _get_normalization_mode(
+ core_config: CoreConfig, player_config: PlayerConfig, streamdetails: StreamDetails
+) -> VolumeNormalizationMode:
+ if not player_config.get_value(CONF_VOLUME_NORMALIZATION):
+ # disabled for this player
+ return VolumeNormalizationMode.DISABLED
+ # work out preference for track or radio
+ preference = VolumeNormalizationMode(
+ core_config.get_value(
+ CONF_VOLUME_NORMALIZATION_RADIO
+ if streamdetails.media_type == MediaType.RADIO
+ else CONF_VOLUME_NORMALIZATION_TRACKS,
+ )
+ )
+
+ # handle no measurement available but fallback to dynamic mode is allowed
+ if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_DYNAMIC:
+ return VolumeNormalizationMode.DYNAMIC
+
+ # handle no measurement available and no fallback allowed
+ if streamdetails.loudness is None and preference == VolumeNormalizationMode.MEASUREMENT_ONLY:
+ return VolumeNormalizationMode.DISABLED
+
+ # handle no measurement available and fallback to fixed gain is allowed
+ if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_FIXED_GAIN:
+ return VolumeNormalizationMode.FIXED_GAIN
+
+ # simply return the preference
+ return preference
--- /dev/null
+"""FFMpeg related helpers."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from collections import deque
+from collections.abc import AsyncGenerator
+from typing import TYPE_CHECKING
+
+from music_assistant.common.helpers.global_cache import get_global_cache_value
+from music_assistant.common.models.errors import AudioError
+from music_assistant.common.models.media_items import AudioFormat, ContentType
+from music_assistant.constants import VERBOSE_LOG_LEVEL
+
+from .process import AsyncProcess
+from .util import close_async_generator
+
+LOGGER = logging.getLogger("ffmpeg")
+
+
+class FFMpeg(AsyncProcess):
+ """FFMpeg wrapped as AsyncProcess."""
+
+ def __init__(
+ self,
+ audio_input: AsyncGenerator[bytes, None] | str | int,
+ input_format: AudioFormat,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
+ extra_args: list[str] | None = None,
+ extra_input_args: list[str] | None = None,
+ audio_output: str | int = "-",
+ collect_log_history: bool = False,
+ ) -> None:
+ """Initialize AsyncProcess."""
+ ffmpeg_args = get_ffmpeg_args(
+ input_format=input_format,
+ output_format=output_format,
+ filter_params=filter_params or [],
+ extra_args=extra_args or [],
+ input_path=audio_input if isinstance(audio_input, str) else "-",
+ output_path=audio_output if isinstance(audio_output, str) else "-",
+ extra_input_args=extra_input_args or [],
+ loglevel="info",
+ )
+ self.audio_input = audio_input
+ self.input_format = input_format
+ self.collect_log_history = collect_log_history
+ self.log_history: deque[str] = deque(maxlen=100)
+ self._stdin_task: asyncio.Task | None = None
+ self._logger_task: asyncio.Task | None = None
+ super().__init__(
+ ffmpeg_args,
+ stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
+ stdout=True if isinstance(audio_output, str) else audio_output,
+ stderr=True,
+ )
+ self.logger = LOGGER
+
+ async def start(self) -> None:
+ """Perform Async init of process."""
+ await super().start()
+ if self.proc:
+ self.logger = LOGGER.getChild(str(self.proc.pid))
+ clean_args = []
+ for arg in self._args[1:]:
+ if arg.startswith("http"):
+ clean_args.append("<URL>")
+ elif "/" in arg and "." in arg:
+ clean_args.append("<FILE>")
+ else:
+ clean_args.append(arg)
+ args_str = " ".join(clean_args)
+ self.logger.log(VERBOSE_LOG_LEVEL, "started with args: %s", args_str)
+ self._logger_task = asyncio.create_task(self._log_reader_task())
+ if isinstance(self.audio_input, AsyncGenerator):
+ self._stdin_task = asyncio.create_task(self._feed_stdin())
+
+ async def close(self, send_signal: bool = True) -> None:
+ """Close/terminate the process and wait for exit."""
+ if self.closed:
+ return
+ if self._stdin_task and not self._stdin_task.done():
+ self._stdin_task.cancel()
+ await super().close(send_signal)
+
+ async def _log_reader_task(self) -> None:
+ """Read ffmpeg log from stderr."""
+ decode_errors = 0
+ async for line in self.iter_stderr():
+ if self.collect_log_history:
+ self.log_history.append(line)
+ if "error" in line or "warning" in line:
+ self.logger.debug(line)
+ elif "critical" in line:
+ self.logger.warning(line)
+ else:
+ self.logger.log(VERBOSE_LOG_LEVEL, line)
+
+ if "Invalid data found when processing input" in line:
+ decode_errors += 1
+ if decode_errors >= 50:
+ self.logger.error(line)
+ await super().close(True)
+
+ # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
+ if line.startswith("Stream #") and ": Audio: " in line:
+ if self.input_format.content_type == ContentType.UNKNOWN:
+ content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
+ content_type = ContentType.try_parse(content_type_raw)
+ self.logger.debug(
+ "Detected (input) content type: %s (%s)", content_type, content_type_raw
+ )
+ self.input_format.content_type = content_type
+ del line
+
+ async def _feed_stdin(self) -> None:
+ """Feed stdin with audio chunks from an AsyncGenerator."""
+ if TYPE_CHECKING:
+ self.audio_input: AsyncGenerator[bytes, None]
+ generator_exhausted = False
+ try:
+ async for chunk in self.audio_input:
+ await self.write(chunk)
+ generator_exhausted = True
+ except Exception as err:
+ if isinstance(err, asyncio.CancelledError):
+ return
+ self.logger.error(
+ "Stream error: %s",
+ str(err),
+ exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
+ )
+ finally:
+ await self.write_eof()
+ # we need to ensure that we close the async generator
+ # if we get cancelled otherwise it keeps lingering forever
+ if not generator_exhausted:
+ await close_async_generator(self.audio_input)
+
+
+async def get_ffmpeg_stream(
+ audio_input: AsyncGenerator[bytes, None] | str,
+ input_format: AudioFormat,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
+ extra_args: list[str] | None = None,
+ chunk_size: int | None = None,
+ extra_input_args: list[str] | None = None,
+) -> AsyncGenerator[bytes, None]:
+ """
+ Get the ffmpeg audio stream as async generator.
+
+ Takes care of resampling and/or recoding if needed,
+ according to player preferences.
+ """
+ async with FFMpeg(
+ audio_input=audio_input,
+ input_format=input_format,
+ output_format=output_format,
+ filter_params=filter_params,
+ extra_args=extra_args,
+ extra_input_args=extra_input_args,
+ ) as ffmpeg_proc:
+ # read final chunks from stdout
+ iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
+ async for chunk in iterator:
+ yield chunk
+
+
+def get_ffmpeg_args(
+ input_format: AudioFormat,
+ output_format: AudioFormat,
+ filter_params: list[str],
+ extra_args: list[str] | None = None,
+ input_path: str = "-",
+ output_path: str = "-",
+ extra_input_args: list[str] | None = None,
+ loglevel: str = "error",
+) -> list[str]:
+ """Collect all args to send to the ffmpeg process."""
+ if extra_args is None:
+ extra_args = []
+ ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
+ if not ffmpeg_present:
+ msg = (
+ "FFmpeg binary is missing from system."
+ "Please install ffmpeg on your OS to enable playback."
+ )
+ raise AudioError(
+ msg,
+ )
+
+ major_version = int("".join(char for char in version.split(".")[0] if not char.isalpha()))
+
+ # generic args
+ generic_args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ loglevel,
+ "-nostats",
+ "-ignore_unknown",
+ "-protocol_whitelist",
+ "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
+ ]
+ # collect input args
+ input_args = []
+ if extra_input_args:
+ input_args += extra_input_args
+ if input_path.startswith("http"):
+ # append reconnect options for direct stream from http
+ input_args += [
+ "-reconnect",
+ "1",
+ "-reconnect_streamed",
+ "1",
+ ]
+ if major_version > 4:
+ # these options are only supported in ffmpeg > 5
+ input_args += [
+ "-reconnect_on_network_error",
+ "1",
+ "-reconnect_on_http_error",
+ "5xx,4xx",
+ ]
+ if input_format.content_type.is_pcm():
+ input_args += [
+ "-ac",
+ str(input_format.channels),
+ "-channel_layout",
+ "mono" if input_format.channels == 1 else "stereo",
+ "-ar",
+ str(input_format.sample_rate),
+ "-acodec",
+ input_format.content_type.name.lower(),
+ "-f",
+ input_format.content_type.value,
+ "-i",
+ input_path,
+ ]
+ else:
+ # let ffmpeg auto detect the content type from the metadata/headers
+ input_args += ["-i", input_path]
+
+ # collect output args
+ output_args = []
+ if output_path.upper() == "NULL":
+ # devnull stream
+ output_args = ["-f", "null", "-"]
+ elif output_format.content_type == ContentType.UNKNOWN:
+ raise RuntimeError("Invalid output format specified")
+ elif output_format.content_type == ContentType.AAC:
+ output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k", output_path]
+ elif output_format.content_type == ContentType.MP3:
+ output_args = ["-f", "mp3", "-b:a", "320k", output_path]
+ else:
+ if output_format.content_type.is_pcm():
+ output_args += ["-acodec", output_format.content_type.name.lower()]
+ # use explicit format identifier for all other
+ output_args += [
+ "-f",
+ output_format.content_type.value,
+ "-ar",
+ str(output_format.sample_rate),
+ "-ac",
+ str(output_format.channels),
+ ]
+ if output_format.output_format_str == "flac":
+ output_args += ["-compression_level", "6"]
+ output_args += [output_path]
+
+ # edge case: source file is not stereo - downmix to stereo
+ if input_format.channels > 2 and output_format.channels == 2:
+ filter_params = [
+ "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE",
+ *filter_params,
+ ]
+
+ # determine if we need to do resampling
+ if (
+ input_format.sample_rate != output_format.sample_rate
+ or input_format.bit_depth > output_format.bit_depth
+ ):
+ # prefer resampling with libsoxr due to its high quality
+ if libsoxr_support:
+ resample_filter = "aresample=resampler=soxr:precision=30"
+ else:
+ resample_filter = "aresample=resampler=swr"
+
+ # sample rate conversion
+ if input_format.sample_rate != output_format.sample_rate:
+ resample_filter += f":osr={output_format.sample_rate}"
+
+ # bit depth conversion: apply dithering when going down to 16 bits
+ if output_format.bit_depth == 16 and input_format.bit_depth > 16:
+ resample_filter += ":osf=s16:dither_method=triangular_hp"
+
+ filter_params.append(resample_filter)
+
+ if filter_params and "-filter_complex" not in extra_args:
+ extra_args += ["-af", ",".join(filter_params)]
+
+ return generic_args + input_args + extra_args + output_args
import urllib.error
import urllib.parse
import urllib.request
-from collections.abc import Awaitable, Callable, Coroutine
+from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine
+from contextlib import suppress
from functools import lru_cache
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as pkg_version
return discovery_info.port
+async def close_async_generator(agen: AsyncGenerator[Any, None]) -> None:
+ """Force close an async generator."""
+ task = asyncio.create_task(agen.__anext__())
+ task.cancel()
+ with suppress(asyncio.CancelledError):
+ await task
+ await agen.aclose()
+
+
class TaskManager:
"""
Helper class to run many tasks at once.
output_format=AIRPLAY_PCM_FORMAT,
filter_params=get_player_filter_params(self.mass, player_id),
audio_output=write,
- logger=self.airplay_player.logger.getChild("ffmpeg"),
)
await self._ffmpeg_proc.start()
await asyncio.to_thread(os.close, write)
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
- CONF_ENTRY_CROSSFADE_FLOW_MODE_REQUIRED,
CONF_ENTRY_ENABLE_ICY_METADATA,
CONF_ENTRY_ENFORCE_MP3,
- CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED,
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
CONF_ENTRY_HTTP_PROFILE_FORCED_2,
ConfigEntry,
ConfigValueType,
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self.bluos_players: dict[str, BluosPlayer] = {}
+ self.bluos_players: dict[str, BluesoundPlayer] = {}
async def on_mdns_service_state_change(
self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
) -> tuple[ConfigEntry, ...]:
"""Return Config Entries for the given player."""
base_entries = await super().get_player_config_entries(self.player_id)
- if not self.bluos_players.get(self.player_id):
+ if not self.bluos_players.get(player_id):
# TODO fix player entries
return (*base_entries, CONF_ENTRY_CROSSFADE)
return (
*base_entries,
CONF_ENTRY_HTTP_PROFILE_FORCED_2,
CONF_ENTRY_CROSSFADE,
- CONF_ENTRY_CROSSFADE_FLOW_MODE_REQUIRED,
CONF_ENTRY_ENFORCE_MP3,
- CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED,
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
CONF_ENTRY_ENABLE_ICY_METADATA,
)
mass_player.volume_mute = muted
await bluos_player.update_attributes()
- async def play_media(
- self, player_id: str, media: PlayerMedia, timeout: float | None = None
- ) -> None:
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA for BluOS player using the provided URL."""
mass_player = self.mass.players.get(player_id)
if bluos_player := self.bluos_players[player_id]:
- play_status = await bluos_player.client.play_url(media.uri, timeout=timeout)
+ play_status = await bluos_player.client.play_url(media.uri)
if play_status == "stream":
# Update media info then optimistically override playback state and source
await bluos_player.update_attributes()
media.uri = media.uri.replace(".flac", ".mp3")
didl_metadata = create_didl_metadata(media)
title = media.title or media.uri
- await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
- self.logger.debug(
- "Enqued next track (%s) to player %s",
- title,
- dlna_player.player.display_name,
- )
+ try:
+ await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
+ except UpnpError:
+ self.logger.error(
+ "Enqueuing the next track failed for player %s - "
+ "the player probably doesn't support this. "
+ "Enable 'flow mode' for this player.",
+ dlna_player.player.display_name,
+ )
+ else:
+ self.logger.debug(
+ "Enqued next track (%s) to player %s",
+ title,
+ dlna_player.player.display_name,
+ )
@catch_request_errors
async def cmd_pause(self, player_id: str) -> None:
output_format=DEFAULT_SNAPCAST_FORMAT,
filter_params=get_player_filter_params(self.mass, player_id),
audio_output=stream_path,
- logger=self.logger.getChild("ffmpeg"),
) as ffmpeg_proc:
player.state = PlayerState.PLAYING
player.current_media = media
else:
# player is group child (synced to another player)
group_parent = self.prov.sonos_players.get(self.client.player.group.coordinator_id)
- if not group_parent:
+ if not group_parent or not group_parent.client:
# handle race condition where the group parent is not yet discovered
return
active_group = group_parent.client.player.group
chunk_size = get_chunksize(streamdetails.audio_format)
stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
- async with AsyncProcess(
- args,
- stdout=True,
- stderr=stderr,
- name="librespot",
- ) as librespot_proc:
- async for chunk in librespot_proc.iter_any(chunk_size):
- yield chunk
+ for attempt in range(1, 3):
+ async with AsyncProcess(
+ args,
+ stdout=True,
+ stderr=stderr,
+ name="librespot",
+ ) as librespot_proc:
+ chunks_received = 0
+ async for chunk in librespot_proc.iter_any(chunk_size):
+ yield chunk
+ chunks_received += 1
+ if chunks_received:
+ break
+ self.logger.warning(
+ "librespot failed to stream track, retrying... (attempt %s/3)", attempt
+ )
+ await asyncio.sleep(0.5)
def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""