CONF_ENTRY_VOLUME_NORMALIZATION_TARGET = ConfigEntry(
key=CONF_VOLUME_NORMALIZATION_TARGET,
type=ConfigEntryType.INTEGER,
- range=(-30, 0),
+ range=(-70, -5),
default_value=-17,
label="Target level for volume normalization",
description="Adjust average (perceived) loudness to this target level",
from __future__ import annotations
from dataclasses import dataclass, field, fields
-from time import time
from typing import Any, Self
from mashumaro import DataClassDictMixin
if media_item["media_type"] == "radio":
return Radio.from_dict(media_item)
return MediaItem.from_dict(media_item)
-
-
-@dataclass(kw_only=True)
-class StreamDetails(DataClassDictMixin):
- """Model for streamdetails."""
-
- # NOTE: the actual provider/itemid of the streamdetails may differ
- # from the connected media_item due to track linking etc.
- # the streamdetails are only used to provide details about the content
- # that is going to be streamed.
-
- # mandatory fields
- provider: str
- item_id: str
- audio_format: AudioFormat
- media_type: MediaType = MediaType.TRACK
-
- # stream_title: radio streams can optionally set this field
- stream_title: str | None = None
- # duration of the item to stream, copied from media_item if omitted
- duration: int | None = None
- # total size in bytes of the item, calculated at eof when omitted
- size: int | None = None
- # expires: timestamp this streamdetails expire
- expires: float = time() + 3600
- # data: provider specific data (not exposed externally)
- data: Any = None
- # if the url/file is supported by ffmpeg directly, use direct stream
- direct: str | None = None
- # bool to indicate that the providers 'get_audio_stream' supports seeking of the item
- can_seek: bool = True
- # callback: optional callback function (or coroutine) to call when the stream completes.
- # needed for streaming provivders to report what is playing
- # receives the streamdetails as only argument from which to grab
- # details such as seconds_streamed.
- callback: Any = None
-
- # the fields below will be set/controlled by the streamcontroller
- queue_id: str | None = None
- seconds_streamed: float | None = None
- seconds_skipped: float | None = None
- gain_correct: float | None = None
- loudness: float | None = None
-
- def __post_serialize__(self, d: dict[Any, Any]) -> dict[Any, Any]:
- """Exclude internal fields from dict."""
- d.pop("data")
- d.pop("direct")
- d.pop("expires")
- d.pop("queue_id")
- d.pop("callback")
- return d
-
- def __str__(self) -> str:
- """Return pretty printable string of object."""
- return self.uri
-
- @property
- def uri(self) -> str:
- """Return uri representation of item."""
- return f"{self.provider}://{self.media_type.value}/{self.item_id}"
import time
from dataclasses import dataclass, field
+from typing import Any, Self
from mashumaro import DataClassDictMixin
def corrected_elapsed_time(self) -> float:
"""Return the corrected/realtime elapsed time."""
return self.elapsed_time + (time.time() - self.elapsed_time_last_updated)
+
+ def to_cache(self) -> dict[str, Any]:
+ """Return the dict that is suitable for storing into the cache db."""
+ d = self.to_dict()
+ d.pop("current_item", None)
+ d.pop("next_item", None)
+ d.pop("index_in_buffer", None)
+ d.pop("announcement_in_progress", None)
+ d.pop("flow_mode", None)
+ d.pop("flow_mode_start_index", None)
+ return d
+
+ @classmethod
+ def from_cache(cls: Self, d: dict[Any, Any]) -> Self:
+ """Restore a PlayerQueue from a cache dict."""
+ d.pop("current_item", None)
+ d.pop("next_item", None)
+ d.pop("index_in_buffer", None)
+ d.pop("announcement_in_progress", None)
+ d.pop("flow_mode", None)
+ d.pop("flow_mode_start_index", None)
+ return cls.from_dict(d)
from __future__ import annotations
from dataclasses import dataclass
+from typing import Any, Self
from uuid import uuid4
from mashumaro import DataClassDictMixin
from .enums import MediaType
-from .media_items import Album, ItemMapping, MediaItemImage, Radio, StreamDetails, Track
+from .media_items import Album, ItemMapping, MediaItemImage, Radio, Track
+from .streamdetails import StreamDetails
@dataclass
if not self.name:
self.name = self.uri
+ def __post_serialize__(self, d: dict[Any, Any]) -> dict[Any, Any]:
+ """Execute action(s) on serialization."""
+ # Exclude internal streamdetails fields from dict
+ if streamdetails := d.get("streamdetails"):
+ streamdetails.pop("data", None)
+ streamdetails.pop("direct", None)
+ streamdetails.pop("expires", None)
+ return d
+
@property
def uri(self) -> str:
"""Return uri for this QueueItem (for logging purposes)."""
image=get_image(media_item),
)
+ def to_cache(self) -> dict[str, Any]:
+ """Return the dict that is suitable for storing into the cache db."""
+ base = self.to_dict()
+ base.pop("streamdetails", None)
+ return base
+
+ @classmethod
+ def from_cache(cls: Self, d: dict[Any, Any]) -> Self:
+ """Restore a QueueItem from a cache dict."""
+ d.pop("streamdetails", None)
+ return cls.from_dict(d)
+
def get_image(media_item: Track | Radio | None) -> MediaItemImage | None:
"""Find the Image for the MediaItem."""
--- /dev/null
+"""Model(s) for streamdetails."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from time import time
+from typing import Any
+
+from mashumaro import DataClassDictMixin
+
+from music_assistant.common.models.enums import MediaType
+from music_assistant.common.models.media_items import AudioFormat
+
+
+@dataclass(kw_only=True)
+class LoudnessMeasurement(DataClassDictMixin):
+ """Model for EBU-R128 loudness measurement details."""
+
+ integrated: float
+ true_peak: float
+ lra: float
+ threshold: float
+
+
+@dataclass(kw_only=True)
+class StreamDetails(DataClassDictMixin):
+ """Model for streamdetails."""
+
+ # NOTE: the actual provider/itemid of the streamdetails may differ
+ # from the connected media_item due to track linking etc.
+ # the streamdetails are only used to provide details about the content
+ # that is going to be streamed.
+
+ # mandatory fields
+ provider: str
+ item_id: str
+ audio_format: AudioFormat
+ media_type: MediaType = MediaType.TRACK
+
+ # stream_title: radio streams can optionally set this field
+ stream_title: str | None = None
+ # duration of the item to stream, copied from media_item if omitted
+ duration: int | None = None
+ # total size in bytes of the item, calculated at eof when omitted
+ size: int | None = None
+ # expires: timestamp this streamdetails expire
+ expires: float = time() + 3600
+ # data: provider specific data (not exposed externally)
+ data: Any = None
+ # direct: if the url/file is supported by ffmpeg directly, use direct stream
+ direct: str | None = None
+ # can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item
+ can_seek: bool = True
+
+ # the fields below will be set/controlled by the streamcontroller
+ loudness: LoudnessMeasurement | None = None
+ queue_id: str | None = None
+ seconds_streamed: float | None = None
+ seconds_skipped: float | None = None
+ target_loudness: float | None = None
+
+ def __str__(self) -> str:
+ """Return pretty printable string of object."""
+ return self.uri
+
+ def __post_serialize__(self, d: dict[Any, Any]) -> dict[Any, Any]:
+ """Execute action(s) on serialization."""
+ d.pop("queue_id", None)
+ d.pop("seconds_streamed", None)
+ d.pop("seconds_skipped", None)
+ d.pop("target_loudness", None)
+ return d
+
+ @property
+ def uri(self) -> str:
+ """Return uri representation of item."""
+ return f"{self.provider}://{self.media_type.value}/{self.item_id}"
API_SCHEMA_VERSION: Final[int] = 23
MIN_SCHEMA_VERSION: Final[int] = 23
-DB_SCHEMA_VERSION: Final[int] = 27
+DB_SCHEMA_VERSION: Final[int] = 28
ROOT_LOGGER_NAME: Final[str] = "music_assistant"
import asyncio
import os
import shutil
-import statistics
from collections.abc import AsyncGenerator
from contextlib import suppress
from itertools import zip_longest
from typing import TYPE_CHECKING
from music_assistant.common.helpers.datetime import utc_timestamp
-from music_assistant.common.helpers.json import json_dumps, json_loads
from music_assistant.common.helpers.uri import parse_uri
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import (
ConfigEntryType,
EventType,
- ExternalID,
MediaType,
ProviderFeature,
ProviderType,
from music_assistant.common.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.common.models.media_items import BrowseFolder, MediaItemType, SearchResults
from music_assistant.common.models.provider import SyncTask
+from music_assistant.common.models.streamdetails import LoudnessMeasurement
from music_assistant.constants import (
DB_SCHEMA_VERSION,
DB_TABLE_ALBUM_TRACKS,
return None
async def set_track_loudness(
- self, item_id: str, provider_instance_id_or_domain: str, loudness: int
+ self, item_id: str, provider_instance_id_or_domain: str, loudness: LoudnessMeasurement
) -> None:
- """List integrated loudness for a track in db."""
- await self.database.insert(
- DB_TABLE_TRACK_LOUDNESS,
- {
- "item_id": item_id,
- "provider": provider_instance_id_or_domain,
- "loudness": loudness,
- },
- allow_replace=True,
- )
+ """Store Loudness Measurement for a track in db."""
+ if provider := self.mass.get_provider(provider_instance_id_or_domain):
+ await self.database.insert(
+ DB_TABLE_TRACK_LOUDNESS,
+ {
+ "item_id": item_id,
+ "provider": provider.lookup_key,
+ "integrated": loudness.integrated,
+ "true_peak": loudness.true_peak,
+ "lra": loudness.lra,
+ "threshold": loudness.threshold,
+ },
+ allow_replace=True,
+ )
async def get_track_loudness(
self, item_id: str, provider_instance_id_or_domain: str
- ) -> float | None:
- """Get integrated loudness for a track in db."""
- if result := await self.database.get_row(
- DB_TABLE_TRACK_LOUDNESS,
- {
- "item_id": item_id,
- "provider": provider_instance_id_or_domain,
- },
- ):
- return result["loudness"]
- return None
-
- async def get_provider_loudness(self, provider_instance_id_or_domain: str) -> float | None:
- """Get average integrated loudness for tracks of given provider."""
- all_items = []
- if provider_instance_id_or_domain == "url":
- # this is not a very good idea for random urls
- return None
- for db_row in await self.database.get_rows(
- DB_TABLE_TRACK_LOUDNESS,
- {
- "provider": provider_instance_id_or_domain,
- },
- ):
- all_items.append(db_row["loudness"])
- if all_items:
- return statistics.fmean(all_items)
+ ) -> LoudnessMeasurement | None:
+ """Get Loudness Measurement for a track in db."""
+ if provider := self.mass.get_provider(provider_instance_id_or_domain):
+ if result := await self.database.get_row(
+ DB_TABLE_TRACK_LOUDNESS,
+ {
+ "item_id": item_id,
+ "provider": provider.lookup_key,
+ },
+ ):
+ return LoudnessMeasurement(
+ integrated=result["integrated"],
+ true_peak=result["true_peak"],
+ lra=result["lra"],
+ threshold=result["threshold"],
+ )
return None
async def mark_item_played(
await asyncio.to_thread(shutil.copyfile, db_path, db_path_backup)
# handle db migration from previous schema to this one
- if prev_version == 25:
+ if prev_version == 27:
self.logger.info(
"Performing database migration from %s to %s",
prev_version,
DB_SCHEMA_VERSION,
)
self.logger.warning("DATABASE MIGRATION IN PROGRESS - THIS CAN TAKE A WHILE")
- # migrate external id(s)
- for table in (
- DB_TABLE_ARTISTS,
- DB_TABLE_ALBUMS,
- DB_TABLE_TRACKS,
- DB_TABLE_PLAYLISTS,
- DB_TABLE_RADIOS,
- ):
- # create new external_ids column
- await self.database.execute(
- f"ALTER TABLE {table} "
- "ADD COLUMN external_ids "
- "json NOT NULL DEFAULT '[]'"
- )
- if table in (DB_TABLE_PLAYLISTS, DB_TABLE_RADIOS):
- continue
- # migrate existing ids into the new external_ids column
- async for item in self.database.iter_items(table):
- external_ids: set[tuple[str, str]] = set()
- if mbid := item["mbid"]:
- external_ids.add((ExternalID.MUSICBRAINZ, mbid))
- for prov_mapping in json_loads(item["provider_mappings"]):
- if isrc := prov_mapping.get("isrc"):
- external_ids.add((ExternalID.ISRC, isrc))
- if barcode := prov_mapping.get("barcode"):
- external_ids.add((ExternalID.BARCODE, barcode))
- if external_ids:
- await self.database.update(
- table,
- {
- "item_id": item["item_id"],
- },
- {
- "external_ids": json_dumps(external_ids),
- },
- )
- # drop mbid column
- await self.database.execute(f"DROP INDEX IF EXISTS {table}_mbid_idx")
- await self.database.execute(f"ALTER TABLE {table} DROP COLUMN mbid")
+
+ # migrate loudness measurements table
+ await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_TRACK_LOUDNESS}")
+ await self.__create_database_tables()
+
# db migration succeeded
self.logger.info(
"Database migration to version %s completed",
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_TRACK_LOUDNESS}(
item_id INTEGER NOT NULL,
provider TEXT NOT NULL,
- loudness REAL,
+ integrated REAL,
+ true_peak REAL,
+ lra REAL,
+ threshold REAL,
UNIQUE(item_id, provider));"""
)
await self.database.execute(
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.constants import FALLBACK_DURATION, ROOT_LOGGER_NAME
from music_assistant.server.helpers.api import api_command
-from music_assistant.server.helpers.audio import set_stream_details
+from music_assistant.server.helpers.audio import get_stream_details
from music_assistant.server.models.core_controller import CoreController
if TYPE_CHECKING:
# try to restore previous state
if prev_state := await self.mass.cache.get(f"queue.state.{queue_id}"):
try:
- queue = PlayerQueue.from_dict(prev_state)
+ queue = PlayerQueue.from_cache(prev_state)
prev_items = await self.mass.cache.get(f"queue.items.{queue_id}", default=[])
- queue_items = [QueueItem.from_dict(x) for x in prev_items]
+ queue_items = [QueueItem.from_cache(x) for x in prev_items]
except Exception as err:
self.logger.warning(
"Failed to restore the queue(items) for %s - %s",
try:
# Check if the QueueItem is playable. For example, YT Music returns Radio Items
# that are not playable which will stop playback.
- await set_stream_details(mass=self.mass, queue_item=next_item)
+ await get_stream_details(mass=self.mass, queue_item=next_item)
# Lazy load the full MediaItem for the QueueItem, making sure to get the
# maximum quality of thumbs
next_item.media_item = await self.mass.music.get_item_by_uri(next_item.uri)
self.mass.create_task(
self.mass.cache.set(
f"queue.items.{queue_id}",
- [x.to_dict() for x in self._queue_items[queue_id]],
+ [x.to_cache() for x in self._queue_items[queue_id]],
)
)
self.mass.create_task(
self.mass.cache.set(
f"queue.state.{queue_id}",
- queue.to_dict(),
+ queue.to_cache(),
)
)
return # guard, just in case something bad happened
if not current_item.duration:
return
- if current_item.streamdetails and current_item.streamdetails.seconds_streamed:
+ # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
+ if current_item.streamdetails and current_item.streamdetails.seconds_streamed is not None:
duration = current_item.streamdetails.seconds_streamed
else:
duration = current_item.duration
track_time = elapsed_time_queue - total_time
break
track_duration = (
+ # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
queue_track.streamdetails.seconds_streamed
- or queue_track.streamdetails.duration
- or queue_track.duration
- or FALLBACK_DURATION
+ if queue_track.streamdetails.seconds_streamed is not None
+ else (
+ queue_track.streamdetails.duration
+ or queue_track.duration
+ or FALLBACK_DURATION
+ )
)
if elapsed_time_queue > (track_duration + total_time):
# total elapsed time is more than (streamed) track duration
check_audio_support,
crossfade_pcm_parts,
get_media_stream,
- set_stream_details,
+ get_stream_details,
)
from music_assistant.server.helpers.process import AsyncProcess
from music_assistant.server.helpers.util import get_ips
if not queue_item:
raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
try:
- await set_stream_details(self.mass, queue_item=queue_item)
+ queue_item.streamdetails = await get_stream_details(self.mass, queue_item=queue_item)
except MediaNotFoundError:
raise web.HTTPNotFound(
reason=f"Unable to retrieve streamdetails for item: {queue_item}"
# get (next) queue item to stream
if queue_track is None:
queue_track = start_queue_item
- await set_stream_details(self.mass, queue_track)
+ queue_track.streamdetails = await get_stream_details(self.mass, queue_track)
else:
seek_position = 0
fade_in = False
import aiofiles
from aiohttp import ClientResponseError, ClientTimeout
+from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
from music_assistant.common.models.errors import (
AudioError,
InvalidDataError,
MediaNotFoundError,
MusicAssistantError,
)
-from music_assistant.common.models.media_items import (
- AudioFormat,
- ContentType,
- MediaType,
- StreamDetails,
-)
+from music_assistant.common.models.media_items import AudioFormat, ContentType, MediaType
+from music_assistant.common.models.streamdetails import LoudnessMeasurement, StreamDetails
from music_assistant.constants import (
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_TARGET,
from music_assistant.server import MusicAssistant
LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.audio")
-
+analyze_jobs: set[str] = set()
# pylint:disable=consider-using-f-string,too-many-locals,too-many-statements
return stripped_data
-async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> None:
- """Analyze track audio, for now we only calculate EBU R128 loudness."""
- if streamdetails.loudness is not None:
- # only when needed we do the analyze job
+async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) -> None:
+ """Analyze track audio to calculate EBU R128 loudness."""
+ if streamdetails.uri in analyze_jobs:
return
-
- LOGGER.debug("Start analyzing audio for %s", streamdetails.uri)
- # calculate BS.1770 R128 integrated loudness with ffmpeg
- input_file = streamdetails.direct or "-"
- proc_args = [
- "ffmpeg",
- "-protocol_whitelist",
- "file,http,https,tcp,tls,crypto,pipe,fd",
- "-t",
- "300", # limit to 5 minutes to prevent OOM
- "-i",
- input_file,
- "-f",
- streamdetails.audio_format.content_type,
- "-af",
- "ebur128=framelog=verbose",
- "-f",
- "null",
- "-",
- ]
- async with AsyncProcess(
- proc_args,
- enable_stdin=streamdetails.direct is None,
- enable_stdout=False,
- enable_stderr=True,
- ) as ffmpeg_proc:
-
- async def writer() -> None:
- """Task that grabs the source audio and feeds it to ffmpeg."""
- music_prov = mass.get_provider(streamdetails.provider)
- chunk_count = 0
- async for audio_chunk in music_prov.get_audio_stream(streamdetails):
- chunk_count += 1
- await ffmpeg_proc.write(audio_chunk)
- if chunk_count == 300:
- # safety guard: max (more or less) 5 minutes seconds of audio may be analyzed
- break
- ffmpeg_proc.write_eof()
-
- if streamdetails.direct is None:
- writer_task = ffmpeg_proc.attach_task(writer())
- # wait for the writer task to finish
- await writer_task
-
- _, stderr = await ffmpeg_proc.communicate()
- try:
- loudness_str = (
- stderr.decode().split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0]
- )
- loudness = float(loudness_str.strip())
- except (IndexError, ValueError, AttributeError):
- LOGGER.warning(
- "Could not determine integrated loudness of %s - %s",
- streamdetails.uri,
- stderr.decode() or "received empty value",
- )
- else:
- streamdetails.loudness = loudness
- await mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness
- )
- LOGGER.debug(
- "Integrated loudness of %s is: %s",
- streamdetails.uri,
- loudness,
- )
+ if len(analyze_jobs) >= 5:
+ LOGGER.debug("Skip analyzing EBU R128 loudness: max number of jobs reached")
+ return
+ try:
+ analyze_jobs.add(streamdetails.uri)
+ item_name = f"{streamdetails.provider}/{streamdetails.item_id}"
+ LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name)
+ # calculate EBU R128 integrated loudness with ffmpeg
+ input_file = streamdetails.direct or "-"
+ proc_args = [
+ "ffmpeg",
+ "-protocol_whitelist",
+ "file,http,https,tcp,tls,crypto,pipe,fd",
+ "-t",
+ "600", # limit to 10 minutes to prevent OOM
+ "-i",
+ input_file,
+ "-f",
+ streamdetails.audio_format.content_type,
+ "-af",
+ "loudnorm=print_format=json",
+ "-f",
+ "null",
+ "-",
+ ]
+ async with AsyncProcess(
+ proc_args,
+ enable_stdin=streamdetails.direct is None,
+ enable_stdout=False,
+ enable_stderr=True,
+ ) as ffmpeg_proc:
+
+ async def writer() -> None:
+ """Task that grabs the source audio and feeds it to ffmpeg."""
+ music_prov = mass.get_provider(streamdetails.provider)
+ chunk_count = 0
+ async for audio_chunk in music_prov.get_audio_stream(streamdetails):
+ chunk_count += 1
+ await ffmpeg_proc.write(audio_chunk)
+ if chunk_count == 600:
+ # safety guard: max (more or less) 10 minutes of audio may be analyzed!
+ break
+ ffmpeg_proc.write_eof()
+
+ if streamdetails.direct is None:
+ writer_task = ffmpeg_proc.attach_task(writer())
+ # wait for the writer task to finish
+ await writer_task
+
+ _, stderr = await ffmpeg_proc.communicate()
+ if loudness_details := _parse_loudnorm(stderr):
+ LOGGER.debug("Loudness measurement for %s: %s", item_name, loudness_details)
+ streamdetails.loudness = loudness_details
+ await mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness_details
+ )
+ else:
+ LOGGER.warning(
+ "Could not determine EBU R128 loudness of %s - %s",
+ item_name,
+ stderr.decode() or "received empty value",
+ )
+ finally:
+ analyze_jobs.discard(streamdetails.uri)
-async def set_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> None:
- """Set streamdetails for the given QueueItem.
+async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> StreamDetails:
+ """Get streamdetails for the given QueueItem.
This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
Do not try to request streamdetails in advance as this is expiring data.
param media_item: The QueueItem for which to request the streamdetails for.
"""
- streamdetails = None
if queue_item.streamdetails and (time() < (queue_item.streamdetails.expires - 360)):
- LOGGER.debug(f"Using cached streamdetails for {queue_item.uri}")
- # we already have fresh streamdetails, use these
- queue_item.streamdetails.seconds_skipped = None
- queue_item.streamdetails.seconds_streamed = None
- streamdetails = queue_item.streamdetails
+ LOGGER.debug(f"Using cached streamdetails from queue_item for {queue_item.uri}")
+ # we already have (fresh) streamdetails stored on the queueitem, use these.
+ # make a copy to prevent we're altering an existing object and introduce race conditions.
+ streamdetails = StreamDetails.from_dict(queue_item.streamdetails.to_dict())
else:
- # fetch streamdetails from provider
# always request the full item as there might be other qualities available
full_item = await mass.music.get_item_by_uri(queue_item.uri)
# sort by quality and check track availability
if not prov_media.available:
LOGGER.debug(f"Skipping unavailable {prov_media}")
continue
- # get streamdetails from provider
+ # guard that provider is available
music_prov = mass.get_provider(prov_media.provider_instance)
if not music_prov:
LOGGER.debug(f"Skipping {prov_media} - provider not available")
continue # provider not available ?
+ # prefer cache
+ item_key = f"{music_prov.lookup_key}/{prov_media.item_id}"
+ cache_key = f"streamdetails_{item_key}"
+ if cache := await mass.cache.get(cache_key):
+ LOGGER.debug(f"Using cached streamdetails for {item_key}")
+ streamdetails = StreamDetails.from_dict(cache)
+ break
+ # get streamdetails from provider
try:
streamdetails: StreamDetails = await music_prov.get_stream_details(
prov_media.item_id
)
+ # store streamdetails in cache
+ expiration = streamdetails.expires - time()
+ if expiration > 300:
+ await mass.cache.set(cache_key, streamdetails.to_dict(), expiration - 60)
except MusicAssistantError as err:
LOGGER.warning(str(err))
else:
break
-
- if not streamdetails:
- msg = f"Unable to retrieve streamdetails for {queue_item}"
- raise MediaNotFoundError(msg)
+ else:
+ raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
# set queue_id on the streamdetails so we know what is being streamed
streamdetails.queue_id = queue_item.queue_id
- # get gain correct / replaygain
- if streamdetails.gain_correct is None:
- loudness, gain_correct = await get_gain_correct(mass, streamdetails)
- streamdetails.gain_correct = gain_correct
- streamdetails.loudness = loudness
+ # handle volume normalization details
+ if not streamdetails.loudness:
+ streamdetails.loudness = await mass.music.get_track_loudness(
+ streamdetails.item_id, streamdetails.provider
+ )
+ if (
+ player_settings := await mass.config.get_player_config(streamdetails.queue_id)
+ ) and player_settings.get_value(CONF_VOLUME_NORMALIZATION):
+ streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
+ else:
+ streamdetails.target_loudness = None
+
if not streamdetails.duration:
streamdetails.duration = queue_item.duration
# make sure that ffmpeg handles mpeg dash streams directly
and streamdetails.data.startswith("http")
):
streamdetails.direct = streamdetails.data
- # set streamdetails as attribute on the queue_item
- queue_item.streamdetails = streamdetails
-
-
-async def get_gain_correct(
- mass: MusicAssistant, streamdetails: StreamDetails
-) -> tuple[float | None, float | None]:
- """Get gain correction for given queue / track combination."""
- player_settings = await mass.config.get_player_config(streamdetails.queue_id)
- if not player_settings or not player_settings.get_value(CONF_VOLUME_NORMALIZATION):
- return (None, None)
- if streamdetails.gain_correct is not None:
- return (streamdetails.loudness, streamdetails.gain_correct)
- target_gain = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
- track_loudness = await mass.music.get_track_loudness(
- streamdetails.item_id, streamdetails.provider
- )
- if track_loudness is None:
- # fallback to provider average
- fallback_track_loudness = await mass.music.get_provider_loudness(streamdetails.provider)
- if fallback_track_loudness is None:
- # fallback to some (hopefully sane) average value for now
- fallback_track_loudness = -8.5
- gain_correct = target_gain - fallback_track_loudness
- else:
- gain_correct = target_gain - track_loudness
- gain_correct = round(gain_correct, 2)
- return (track_loudness, gain_correct)
+ return streamdetails
def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None):
fade_in=fade_in,
)
- async with AsyncProcess(args, enable_stdin=streamdetails.direct is None) as ffmpeg_proc:
+ async with AsyncProcess(
+ args, enable_stdin=streamdetails.direct is None, enable_stderr=True
+ ) as ffmpeg_proc:
LOGGER.debug("start media stream for: %s", streamdetails.uri)
async def writer() -> None:
prev_chunk = chunk
# all chunks received, strip silence of last part
- if strip_silence_end:
- stripped_audio = await strip_silence(
+
+ if strip_silence_end and prev_chunk:
+ final_chunk = await strip_silence(
mass,
prev_chunk,
sample_rate=pcm_format.sample_rate,
bit_depth=pcm_format.bit_depth,
reverse=True,
)
- yield (True, stripped_audio)
- bytes_sent += len(stripped_audio)
- del stripped_audio
else:
- yield (True, prev_chunk)
- bytes_sent += len(prev_chunk)
+ final_chunk = prev_chunk
+
+ # use communicate to read stderr and wait for exit
+ # read log for loudness measurement (or errors)
+ _, stderr = await ffmpeg_proc.communicate()
+ if ffmpeg_proc.returncode != 0:
+ # ffmpeg has a non zero returncode meaning trouble
+ LOGGER.getChild("ffmpeg").warning("STREAM ERROR on %s", streamdetails.uri)
+ LOGGER.getChild("ffmpeg").warning(stderr.decode())
+ elif loudness_details := _parse_loudnorm(stderr):
+ LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
+ streamdetails.loudness = loudness_details
+ await mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness_details
+ )
+ else:
+ LOGGER.getChild("ffmpeg").debug(stderr.decode())
+ # ensure the final chunk is sent and mark as final
+ # its important this is done here at the end so we can catch errors first
+ yield (True, final_chunk)
+ bytes_sent += len(final_chunk)
+ del final_chunk
del prev_chunk
- # update duration details based on the actual pcm data we sent
- streamdetails.seconds_streamed = bytes_sent / pcm_sample_size
- streamdetails.duration = seek_position + streamdetails.seconds_streamed
-
except (asyncio.CancelledError, GeneratorExit):
LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
raise
else:
LOGGER.debug("finished media stream for: %s", streamdetails.uri)
+ # store accurate duration
+ streamdetails.duration = seek_position + bytes_sent / pcm_sample_size
finally:
# report playback
- await mass.music.mark_item_played(
- streamdetails.media_type, streamdetails.item_id, streamdetails.provider
- )
- if streamdetails.callback:
- mass.create_task(streamdetails.callback, streamdetails)
- # send analyze job to background worker
- if streamdetails.loudness is None:
- mass.create_task(analyze_audio(mass, streamdetails))
+ seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
+ streamdetails.seconds_streamed = seconds_streamed
+ if seconds_streamed < 20:
+ mass.create_task(
+ mass.music.mark_item_played(
+ streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+ )
+ )
+ if music_prov := mass.get_provider(streamdetails.provider):
+ mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+
+ if not streamdetails.loudness:
+ # send loudness analyze job to background worker
+ # note that we only do this if a track was at least been partially played
+ mass.create_task(analyze_loudness(mass, streamdetails))
async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]:
"ffmpeg",
"-hide_banner",
"-loglevel",
- "warning" if LOGGER.isEnabledFor(logging.DEBUG) else "quiet",
+ "info",
"-ignore_unknown",
"-protocol_whitelist",
"file,http,https,tcp,tls,crypto,pipe,data,fd",
# collect extra and filter args
extra_args = []
filter_params = []
- if streamdetails.gain_correct is not None:
- filter_params.append(f"volume={streamdetails.gain_correct}dB")
+ if streamdetails.target_loudness is not None:
+ filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
+ if streamdetails.loudness:
+ filter_rule += f":measured_I={streamdetails.loudness.integrated}"
+ filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
+ filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
+ filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
+ filter_rule += ":print_format=json"
+ filter_params.append(filter_rule)
if (
streamdetails.audio_format.sample_rate != pcm_output_format.sample_rate
and libsoxr_support
extra_args += ["-af", ",".join(filter_params)]
return generic_args + input_args + extra_args + output_args
+
+
+def _parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None:
+ """Parse Loudness measurement from ffmpeg stderr output."""
+ stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
+ if "[Parsed_loudnorm_" not in stderr_data:
+ return None
+ stderr_data = stderr_data.split("[Parsed_loudnorm_")[1]
+ stderr_data = stderr_data.rsplit("]")[-1].strip()
+ try:
+ loudness_data = json_loads(stderr_data)
+ except JSON_DECODE_EXCEPTIONS:
+ return None
+ return LoudnessMeasurement(
+ integrated=float(loudness_data["input_i"]),
+ true_peak=float(loudness_data["input_tp"]),
+ lra=float(loudness_data["input_lra"]),
+ threshold=float(loudness_data["input_thresh"]),
+ )
self._enable_stderr = enable_stderr
self._attached_task: asyncio.Task = None
self.closed = False
+ self.returncode: int | None = None
async def __aenter__(self) -> AsyncProcess:
"""Enter context manager."""
if self._proc.returncode is None:
try:
async with asyncio.timeout(10):
- await self._proc.communicate()
+ await self.communicate()
except TimeoutError:
self._proc.kill()
await self.wait()
async def wait(self) -> int:
"""Wait for the process and return the returncode."""
+ if self.returncode is not None:
+ return self.returncode
if self._proc.returncode is not None:
- return self._proc.returncode
- exitcode = await self._proc.wait()
+ self.returncode = self._proc.returncode
+ return self.returncode
+ self.returncode = await self._proc.wait()
self.closed = True
- return exitcode
+ return self.returncode
async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
"""Write bytes to process and read back results."""
- return await self._proc.communicate(input_data)
+ stdout, stderr = await self._proc.communicate(input_data)
+ self.returncode = self._proc.returncode
+ return (stdout, stderr)
+
+ async def read_stderr(self, n: int = -1) -> bytes:
+ """Read up to n bytes from the stderr stream.
+
+ If n is positive, this function try to read n bytes,
+ and may return less or equal bytes than requested, but at least one byte.
+ If EOF was received before any byte is read, this function returns empty byte object.
+ """
+ return await self._proc.stderr.read(n)
def attach_task(self, coro: Coroutine) -> asyncio.Task:
"""Attach given coro func as reader/writer task to properly cancel it when needed."""
PlaylistTrack,
Radio,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from .provider import Provider
"""
return True
+ @property
+ def lookup_key(self) -> str:
+ """Return domain if streaming_provider or instance_id otherwise."""
+ return self.domain if self.is_streaming_provider else self.instance_id
+
async def search(
self,
search_query: str,
if ProviderFeature.SIMILAR_TRACKS in self.supported_features:
raise NotImplementedError
- async def get_stream_details(self, item_id: str) -> StreamDetails | None:
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a track/radio."""
raise NotImplementedError
if streamdetails.direct is None:
raise NotImplementedError
+ async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ """Handle callback when an item completed streaming."""
+
async def resolve_image(self, path: str) -> str | bytes | AsyncGenerator[bytes, None]:
"""
Resolve an image from an image path.
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
from music_assistant.common.models.provider import ProviderManifest
+from music_assistant.common.models.streamdetails import StreamDetails
# pylint: disable=no-name-in-module
from music_assistant.server.helpers.app_vars import app_var
]["data"][:limit]
return [await self.get_track(track["SNG_ID"]) for track in tracks]
- async def get_stream_details(self, item_id: str) -> StreamDetails | None:
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
url_details, song_data = await self.gw_client.get_deezer_track_urls(item_id)
url = url_details["sources"][0]["url"]
data={"url": url, "format": url_details["format"]},
expires=url_details["exp"],
size=int(song_data[f"FILESIZE_{url_details['format']}"]),
- callback=self.log_listen_cb,
)
async def get_audio_stream(
del buffer[:2048]
yield bytes(buffer)
- async def log_listen_cb(self, stream_details) -> None:
- """Log the end of a track playback."""
- await self.gw_client.log_listen(last_track=stream_details)
+ async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ """Handle callback when an item completed streaming."""
+ await self.gw_client.log_listen(last_track=streamdetails)
### PARSING METADATA FUNCTIONS ###
from aiohttp import ClientSession
from yarl import URL
-from music_assistant.common.models.media_items import StreamDetails
+from music_assistant.common.models.streamdetails import StreamDetails
USER_AGENT_HEADER = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import VARIOUS_ARTISTS_NAME
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.controllers.music import DB_SCHEMA_VERSION
PlaylistTrack,\r
ProviderMapping,\r
SearchResults,\r
- StreamDetails,\r
Track,\r
)\r
from music_assistant.common.models.media_items import Album as JellyfinAlbum\r
from music_assistant.common.models.media_items import Artist as JellyfinArtist\r
from music_assistant.common.models.media_items import Playlist as JellyfinPlaylist\r
from music_assistant.common.models.media_items import Track as JellyfinTrack\r
+from music_assistant.common.models.streamdetails import StreamDetails\r
\r
if TYPE_CHECKING:\r
from music_assistant.common.models.provider import ProviderManifest\r
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import (
CONF_PASSWORD,
CONF_PATH,
)
return [self._parse_track(entry) for entry in songs]
- async def get_stream_details(self, item_id: str) -> StreamDetails | None:
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get the details needed to process a specified track."""
try:
sonic_song: SonicSong = await self._run_async(self._conn.getSong, item_id)
can_seek=self._seek_support,
audio_format=AudioFormat(content_type=ContentType.try_parse(mime_type)),
duration=sonic_song.duration if sonic_song.duration is not None else 0,
- callback=self._report_playback_stopped,
)
async def _report_playback_started(self, item_id: str) -> None:
await self._run_async(self._conn.scrobble, sid=item_id, submission=False)
- async def _report_playback_stopped(self, streamdetails: StreamDetails) -> None:
- if streamdetails.seconds_streamed >= streamdetails.duration / 2:
+ async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ """Handle callback when an item completed streaming."""
+ if seconds_streamed >= streamdetails.duration / 2:
await self._run_async(self._conn.scrobble, sid=streamdetails.item_id, submission=True)
async def get_audio_stream(
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.music_provider import MusicProvider
return albums
return []
- async def get_stream_details(self, item_id: str) -> StreamDetails | None:
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a track."""
plex_track = await self._get_data(item_id, PlexTrack)
if not plex_track or not plex_track.media:
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import (
CONF_PASSWORD,
CONF_USERNAME,
data=streamdata, # we need these details for reporting playback
expires=time.time() + 3600, # not sure about the real allowed value
direct=streamdata["url"],
- callback=self._report_playback_stopped,
)
async def _report_playback_started(self, streamdata: dict) -> None:
]
await self._post_data("track/reportStreamingStart", data=events)
- async def _report_playback_stopped(self, streamdetails: StreamDetails) -> None:
- """Report playback stop to qobuz."""
+ async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ """Handle callback when an item completed streaming."""
user_id = self._user_auth_info["user"]["id"]
await self._get_data(
"/track/reportStreamingEnd",
user_id=user_id,
track_id=str(streamdetails.item_id),
- duration=try_parse_int(streamdetails.seconds_streamed),
+ duration=try_parse_int(seconds_streamed),
)
async def _parse_artist(self, artist_obj: dict):
ProviderMapping,
Radio,
SearchResults,
- StreamDetails,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.helpers.audio import get_radio_stream, resolve_radio_stream
from music_assistant.server.models.music_provider import MusicProvider
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.models.music_provider import MusicProvider
from .soundcloudpy.asyncsoundcloudpy import SoundcloudAsyncAPI
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
# pylint: disable=no-name-in-module
"""Return the content details for the given track when it will be streamed."""
# make sure a valid track is requested.
track = await self.get_track(item_id)
- if not track:
- msg = f"track {item_id} not found"
- raise MediaNotFoundError(msg)
- # make sure that the token is still valid by just requesting it
- await self.login()
return StreamDetails(
item_id=track.item_id,
provider=self.instance_id,
content_type=ContentType.OGG,
),
duration=track.duration,
+ # these streamdetails may be cached for a long time,
+ # as there is no time sensitive info in them
+ expires=time.time() + 30 * 24 * 3600,
)
async def get_audio_stream(
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import AudioTags, parse_tags
from music_assistant.server.models.music_provider import MusicProvider
MediaType,
ProviderMapping,
Radio,
- StreamDetails,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_USERNAME
from music_assistant.server.helpers.audio import get_radio_stream, resolve_radio_stream
from music_assistant.server.helpers.tags import parse_tags
MediaItemType,
ProviderMapping,
Radio,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.helpers.audio import (
get_file_stream,
get_http_stream,
await self.mass.cache.set(cache_key, media_info.raw)
return (item_id, url, media_info)
- async def get_stream_details(self, item_id: str) -> StreamDetails | None:
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a track/radio."""
item_id, url, media_info = await self._get_media_info(item_id)
is_radio = media_info.get("icy-name") or not media_info.duration
PlaylistTrack,
ProviderMapping,
SearchResults,
- StreamDetails,
Track,
)
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.models.music_provider import MusicProvider