from music_assistant.server import MusicAssistant
LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.audio")
-analyze_jobs: set[str] = set()
# pylint:disable=consider-using-f-string,too-many-locals,too-many-statements
return stripped_data
-async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) -> None:
- """Analyze track audio to calculate EBU R128 loudness."""
- if streamdetails.uri in analyze_jobs:
- return
- if len(analyze_jobs) >= 5:
- LOGGER.debug("Skip analyzing EBU R128 loudness: max number of jobs reached")
- return
- try:
- analyze_jobs.add(streamdetails.uri)
- item_name = f"{streamdetails.provider}/{streamdetails.item_id}"
- LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name)
- # calculate EBU R128 integrated loudness with ffmpeg
- ffmpeg_args = get_ffmpeg_args(
- input_format=streamdetails.audio_format,
- output_format=streamdetails.audio_format,
- filter_params=["loudnorm=print_format=json"],
- extra_args=["-t", "600"], # limit to 10 minutes to prevent OOM
- input_path=streamdetails.direct or "-",
- output_path="NULL",
- )
- async with AsyncProcess(
- ffmpeg_args,
- enable_stdin=streamdetails.direct is None,
- enable_stdout=False,
- enable_stderr=True,
- ) as ffmpeg_proc:
- if streamdetails.direct is None:
- music_prov = mass.get_provider(streamdetails.provider)
- chunk_count = 0
- async for audio_chunk in music_prov.get_audio_stream(streamdetails):
- chunk_count += 1
- await ffmpeg_proc.write(audio_chunk)
- if chunk_count == 600:
- # safety guard: max (more or less) 10 minutes of audio may be analyzed!
- break
- await ffmpeg_proc.write_eof()
-
- _, stderr = await ffmpeg_proc.communicate()
- if loudness_details := _parse_loudnorm(stderr):
- LOGGER.debug("Loudness measurement for %s: %s", item_name, loudness_details)
- streamdetails.loudness = loudness_details
- await mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness_details
- )
- else:
- LOGGER.warning(
- "Could not determine EBU R128 loudness of %s - %s",
- item_name,
- stderr.decode() or "received empty value",
- )
- finally:
- analyze_jobs.discard(streamdetails.uri)
-
-
async def get_stream_details(
mass: MusicAssistant,
queue_item: QueueItem,
finished = False
elif loudness_details := _parse_loudnorm(stderr):
logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
- required_seconds = 300 if streamdetails.media_type == MediaType.RADIO else 60
- if finished or seconds_streamed >= required_seconds:
+ required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+ if finished or (seconds_streamed >= required_seconds):
LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
streamdetails.loudness = loudness_details
await mass.music.set_track_loudness(
if music_prov := mass.get_provider(streamdetails.provider):
mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
- if not streamdetails.loudness:
- # send loudness analyze job to background worker
- # note that we only do this if a track was at least been partially played
- mass.create_task(analyze_loudness(mass, streamdetails))
-
async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]:
"""