CONF_VOLUME_CONTROL: Final[str] = "volume_control"
CONF_MUTE_CONTROL: Final[str] = "mute_control"
CONF_OUTPUT_CODEC: Final[str] = "output_codec"
-CONF_ALLOW_MEMORY_CACHE: Final[str] = "allow_memory_cache"
+CONF_ALLOW_AUDIO_CACHE: Final[str] = "allow_audio_cache"
+CONF_AUDIO_CACHE_MAX_SIZE: Final[str] = "audio_cache_max_size"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
DEFAULT_PORT: Final[int] = 8095
-DEFAULT_ALLOW_MEMORY_CACHE: Final[bool] = True
+DEFAULT_ALLOW_AUDIO_CACHE: Final[str] = "auto"
+DEFAULT_AUDIO_CACHE_MAX_SIZE: Final[int] = 5 # 5gb
# common db tables
DB_TABLE_PLAYLOG: Final[str] = "playlog"
from music_assistant.constants import (
ANNOUNCE_ALERT_FILE,
- CONF_ALLOW_MEMORY_CACHE,
+ CONF_ALLOW_AUDIO_CACHE,
+ CONF_AUDIO_CACHE_MAX_SIZE,
CONF_BIND_IP,
CONF_BIND_PORT,
CONF_CROSSFADE,
CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
CONF_VOLUME_NORMALIZATION_RADIO,
CONF_VOLUME_NORMALIZATION_TRACKS,
- DEFAULT_ALLOW_MEMORY_CACHE,
+ DEFAULT_ALLOW_AUDIO_CACHE,
+ DEFAULT_AUDIO_CACHE_MAX_SIZE,
DEFAULT_PCM_FORMAT,
DEFAULT_STREAM_HEADERS,
ICY_HEADERS,
)
from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
-from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
+from music_assistant.helpers.util import (
+ clean_old_files,
+ get_ip,
+ get_ips,
+ select_free_port,
+ try_parse_bool,
+)
from music_assistant.helpers.webserver import Webserver
from music_assistant.models.core_controller import CoreController
from music_assistant.models.plugin import PluginProvider
)
self.manifest.icon = "cast-audio"
self.announcements: dict[str, str] = {}
+ # create cache dir if needed
+ self._audio_cache_dir = audio_cache_dir = os.path.join(self.mass.cache_path, ".audio")
+ if not os.path.isdir(audio_cache_dir):
+ os.makedirs(audio_cache_dir)
@property
def base_url(self) -> str:
"""Return the base_url for the streamserver."""
return self._server.base_url
+ @property
+ def audio_cache_dir(self) -> str:
+ """Return the directory where audio cache files are stored."""
+ return self._audio_cache_dir
+
async def get_config_entries(
self,
action: str | None = None,
required=False,
),
ConfigEntry(
- key=CONF_ALLOW_MEMORY_CACHE,
- type=ConfigEntryType.BOOLEAN,
- default_value=DEFAULT_ALLOW_MEMORY_CACHE,
- label="Allow (in-memory) caching of audio streams",
- description="To ensure smooth playback as well as fast seeking, "
- "Music Assistant by default caches audio streams (in memory). "
- "On systems with limited memory, this can be disabled, "
- "but may result in less smooth playback.",
+ key=CONF_ALLOW_AUDIO_CACHE,
+ type=ConfigEntryType.STRING,
+ default_value=DEFAULT_ALLOW_AUDIO_CACHE,
+ options=[
+ ConfigValueOption("Always", "always"),
+ ConfigValueOption("Disabled", "disabled"),
+ ConfigValueOption("Auto", "auto"),
+ ],
+ label="Allow caching of remote/cloudbased audio streams",
+ description="To ensure smooth(er) playback as well as fast seeking, "
+ "Music Assistant can cache audio streams on disk. \n"
+ "On systems with limited diskspace, this can be disabled, "
+ "but may result in less smooth playback or slower seeking.\n\n"
+ "**Always:** Enforce caching of audio streams at all times "
+ "(as long as there is enough free space)."
+ "**Disabled:** Never cache audio streams.\n"
+ "**Auto:** Let Music Assistant decide if caching "
+ "should be used on a per-item base.",
+ category="advanced",
+ required=True,
+ ),
+ ConfigEntry(
+ key=CONF_AUDIO_CACHE_MAX_SIZE,
+ type=ConfigEntryType.INTEGER,
+ default_value=DEFAULT_AUDIO_CACHE_MAX_SIZE,
+ label="Maximum size of audio cache",
+ description="The maximum amount of diskspace (in GB) "
+ "the audio cache may consume (if enabled).",
+ range=(1, 50),
category="advanced",
- required=False,
),
)
FFMPEG_LOGGER.setLevel(self.logger.level)
# perform check for ffmpeg version
await check_ffmpeg_version()
+ await self._clean_audio_cache()
# start the webserver
self.publish_port = config.get_value(CONF_BIND_PORT)
self.publish_ip = config.get_value(CONF_PUBLISH_IP)
bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
channels=2,
)
+
+ async def _clean_audio_cache(self) -> None:
+ """Clean up audio cache periodically."""
+ max_cache_size = await self.mass.config.get_core_config_value(
+ self.domain, CONF_AUDIO_CACHE_MAX_SIZE
+ )
+ cache_enabled = await self.mass.config.get_core_config_value(
+ self.domain, CONF_ALLOW_AUDIO_CACHE
+ )
+ if cache_enabled == "disabled":
+ max_cache_size = 0.001
+ await clean_old_files(self.audio_cache_dir, max_cache_size)
+ # reschedule self
+ self.mass.call_later(3600, self._clean_audio_cache)
from music_assistant_models.streamdetails import AudioFormat
from music_assistant.constants import (
- CONF_ALLOW_MEMORY_CACHE,
+ CONF_ALLOW_AUDIO_CACHE,
CONF_ENTRY_OUTPUT_LIMITER,
CONF_OUTPUT_CHANNELS,
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_RADIO,
CONF_VOLUME_NORMALIZATION_TARGET,
CONF_VOLUME_NORMALIZATION_TRACKS,
- DEFAULT_ALLOW_MEMORY_CACHE,
+ DEFAULT_ALLOW_AUDIO_CACHE,
MASS_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
)
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, communicate
-from .util import detect_charset
+from .util import detect_charset, has_enough_space
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
+REQUIRED_FREE_CACHE_SPACE = 5 # 5 GB
+
async def remove_file(file_path: str) -> None:
"""Remove file path (if it exists)."""
"""
StreamCache.
- Basic class to handle (temporary) in-memory caching of audio streams.
+ Basic class to handle caching of audio streams to a (semi) temporary file.
Useful in case of slow or unreliable network connections, faster seeking,
or when the audio stream is slow itself.
"""
async def create(self) -> None:
"""Create the cache file (if needed)."""
- self.mass.cancel_timer(f"clear_cache_{self.cache_id}")
+ if (
+ self._cache_file is not None
+ and await asyncio.to_thread(os.path.exists, self._cache_file)
+ and self._first_part_received.is_set()
+ ):
+ # cache file already exists
+ return
+ # use cache controller to store the translation of uri-->cache file
+ if stored_cache_file := await self.mass.cache.get(
+ self.streamdetails.uri, base_key="audiocache"
+ ):
+ # cache file already exists in memory
+ self._cache_file = stored_cache_file
+ if await asyncio.to_thread(os.path.exists, self._cache_file):
+ # cache file already exists
+ return
+ else:
+ # create new cache file
+ cache_id = shortuuid.random(30)
+ self._cache_file = os.path.join(self.mass.streams.audio_cache_dir, cache_id)
+ await self.mass.cache.set(
+ self.streamdetails.uri,
+ self._cache_file,
+ base_key="audiocache",
+ )
+ # start fetch task if its not already running
if self._fetch_task is None:
- self._fetch_task = self.mass.create_task(self._fill_cache())
+ self._fetch_task = self.mass.create_task(self._create_cache_file())
# wait until the first part of the file is received
await self._first_part_received.wait()
- async def get_audio_stream(self) -> AsyncGenerator[bytes, None]:
- """Stream audio from cachedata (while it might even still being written)."""
- try:
- self._subscribers += 1
- bytes_read = 0
- chunksize = 64000
- await self.create()
- while True:
- async with self._lock:
- chunk = self._data[bytes_read : bytes_read + chunksize]
- bytes_read += len(chunk)
- if len(chunk) < chunksize and self._all_data_written.is_set():
- # reached EOF
- break
- elif not chunk:
- # data is not yet available, wait a bit
- await asyncio.sleep(0.05)
- else:
- yield chunk
- del chunk
- await asyncio.sleep(0) # yield to eventloop
- finally:
- self._subscribers -= 1
- if self._subscribers == 0:
- # set a timer to remove the tempfile after 1 minute
- # if the file is accessed again within this period,
- # the timer will be cancelled
- self.mass.call_later(60, self._clear, task_id=f"clear_cache_{self.cache_id}")
-
- async def _fill_cache(self) -> None:
+ async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]:
+ """
+ Get the cached audio stream.
+
+ Returns a string with the path of the cachefile if the file is ready.
+ If the file is not yet ready, it will return an async generator that will
+ stream the (intermediate) audio data from the cache file.
+ """
+
+ async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
+ chunksize = get_chunksize(self.streamdetails.audio_format, 1)
+ async with aiofiles.open(self._cache_file, "rb") as file:
+ while True:
+ chunk = await file.read(chunksize)
+ if chunk:
+ yield chunk
+ await asyncio.sleep(0) # yield to eventloop
+ del chunk
+ elif self._all_data_written.is_set():
+ # reached EOF
+ break
+ else:
+ # data is not yet available, wait a bit
+ await asyncio.sleep(0.05)
+
+ if await asyncio.to_thread(os.path.exists, self._cache_file):
+ if self._fetch_task is None:
+ # a complete cache file already exists on disk from a previous run
+ return self._cache_file
+ if self._all_data_written.is_set():
+ # cache file is ready
+ return self._cache_file
+ # cache file does not exist at all (or is still being written)
+ await self.create()
+ return _stream_from_cache()
+
+ async def _create_cache_file(self) -> None:
time_start = time.time()
- self.logger.debug("Fetching audio stream for %s", self.streamdetails.uri)
+ self.logger.debug("Creating audio cache for %s", self.streamdetails.uri)
+ extra_input_args = self.org_extra_input_args or []
if self.org_stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
self.streamdetails,
)
- elif self.org_stream_type in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.HLS):
+ elif self.org_stream_type == StreamType.ICY:
+ raise NotImplementedError("Caching of this streamtype is not supported!")
+ elif self.org_stream_type == StreamType.HLS:
+ if self.streamdetails.media_type == MediaType.RADIO:
+ raise NotImplementedError("Caching of this streamtype is not supported!")
+ substream = await get_hls_substream(self.mass, self.org_path)
+ audio_source = substream.path
+ elif self.org_stream_type == StreamType.ENCRYPTED_HTTP:
audio_source = self.org_path
+ extra_input_args += ["-decryption_key", self.streamdetails.decryption_key]
+ elif self.org_stream_type == StreamType.MULTI_FILE:
+ audio_source = get_multi_file_stream(self.mass, self.streamdetails)
else:
- raise NotImplementedError("Caching of this streamtype is not supported")
-
- extra_input_args = self.org_extra_input_args or []
- if self.streamdetails.decryption_key:
- extra_input_args += [
- "-decryption_key",
- self.streamdetails.decryption_key,
- ]
+ audio_source = self.org_path
- # we always use an intermediate ffmpeg process to fetch the original audio source
+ # we always use ffmpeg to fetch the original audio source
# this may feel a bit redundant, but it's the most reliable way to fetch the audio
# because ffmpeg has all logic to handle different audio formats, codecs, etc.
# and it also accounts for complicated cases such as encrypted streams or
# m4a/mp4 streams with the moov atom at the end of the file.
- # ffmpeg will produce a lossless copy of the original codec to stdout.
+ # ffmpeg will produce a lossless copy of the original codec.
self._first_part_received.clear()
self._all_data_written.clear()
- self._data = b""
- async for chunk in get_ffmpeg_stream(
- audio_input=audio_source,
- input_format=self.streamdetails.audio_format,
- output_format=self.streamdetails.audio_format,
- chunk_size=64000,
- # apply readrate limiting to avoid buffering too much data too fast
- # so we only allow reading into the cache max 5 times the normal speed
- extra_input_args=["-readrate", "5", *extra_input_args],
- ):
- async with self._lock:
- self._data += chunk
- del chunk
- await asyncio.sleep(0) # yield to eventloop
- if not self._first_part_received.is_set():
- self._first_part_received.set()
- self.logger.debug(
- "First part received for %s after %.2fs",
- self.streamdetails.uri,
- time.time() - time_start,
- )
- self._all_data_written.set()
- self.logger.debug(
- "Writing all data for %s done in %.2fs",
- self.streamdetails.uri,
- time.time() - time_start,
- )
+ try:
+ ffmpeg_proc = FFMpeg(
+ audio_input=audio_source,
+ input_format=self.org_audio_format,
+ output_format=self.streamdetails.audio_format,
+ extra_input_args=extra_input_args,
+ audio_output=self._cache_file,
+ collect_log_history=True,
+ )
+ await ffmpeg_proc.start()
+ # wait until the first data is written to the cache file
+ while ffmpeg_proc.returncode is None:
+ await asyncio.sleep(0.1)
+ if not await asyncio.to_thread(os.path.exists, self._cache_file):
+ continue
+ if await asyncio.to_thread(os.path.getsize, self._cache_file) > 64000:
+ break
+
+ self._first_part_received.set()
+ self.logger.debug(
+ "First part received for %s after %.2fs",
+ self.streamdetails.uri,
+ time.time() - time_start,
+ )
+ # wait until ffmpeg is done
+ await ffmpeg_proc.wait()
+
+ if ffmpeg_proc.returncode != 0:
+ ffmpeg_proc.logger.warning("\n".join(ffmpeg_proc.log_history))
+ raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}")
+
+ self._all_data_written.set()
+ self.logger.debug(
+ "Writing all data for %s done in %.2fs",
+ self.streamdetails.uri,
+ time.time() - time_start,
+ )
+ except Exception as err:
+ self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err)
+ # remove the cache file
+ await remove_file(self._cache_file)
+ finally:
+ await ffmpeg_proc.close()
def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
"""Initialize the StreamCache."""
self.mass = mass
self.streamdetails = streamdetails
- self.cache_id = shortuuid.random(20)
self.logger = LOGGER.getChild("cache")
+ self._cache_file: str | None = None
self._fetch_task: asyncio.Task | None = None
self._subscribers: int = 0
self._first_part_received = asyncio.Event()
self._all_data_written = asyncio.Event()
- self._data: bytes = b""
- self._lock: asyncio.Lock = asyncio.Lock()
self.org_path: str | None = streamdetails.path
self.org_stream_type: StreamType | None = streamdetails.stream_type
self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args
+ self.org_audio_format = streamdetails.audio_format
+ streamdetails.audio_format = AudioFormat(
+ content_type=ContentType.NUT,
+ codec_type=streamdetails.audio_format.codec_type,
+ sample_rate=streamdetails.audio_format.sample_rate,
+ bit_depth=streamdetails.audio_format.bit_depth,
+ channels=streamdetails.audio_format.channels,
+ )
streamdetails.path = "-"
streamdetails.stream_type = StreamType.CACHE
streamdetails.can_seek = True
streamdetails.allow_seek = True
streamdetails.extra_input_args = []
- async def _clear(self) -> None:
- """Clear the cache."""
- self.logger.debug("Cleaning up cache %s", self.streamdetails.uri)
- if self._fetch_task and not self._fetch_task.done():
- self._fetch_task.cancel()
- self._fetch_task = None
- self._first_part_received.clear()
- self._all_data_written.clear()
- del self._data
- self._data = b""
-
- def __del__(self) -> None:
- """Ensure the cache data gets cleaned up."""
- if self.mass.closing:
- # edge case: MA is closing
- return
- self.mass.cancel_timer(f"remove_file_{self.cache_id}")
- del self._data
-
async def crossfade_pcm_parts(
fade_in_part: bytes,
int((time.time() - time_start) * 1000),
)
- if streamdetails.decryption_key:
- # using intermediate cache is mandatory for encrypted streams
- streamdetails.enable_cache = True
-
# determine if we may use caching for the audio stream
if streamdetails.enable_cache is None:
- allow_cache = mass.config.get_raw_core_config_value(
- "streams", CONF_ALLOW_MEMORY_CACHE, DEFAULT_ALLOW_MEMORY_CACHE
- )
- streamdetails.enable_cache = (
- allow_cache
- and streamdetails.duration is not None
- and streamdetails.media_type
- in (MediaType.TRACK, MediaType.AUDIOBOOK, MediaType.PODCAST_EPISODE)
- and streamdetails.stream_type
- in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.CUSTOM, StreamType.HLS)
- and get_chunksize(streamdetails.audio_format, streamdetails.duration) < 100000000
- )
+ streamdetails.enable_cache = await _is_cache_allowed(mass, streamdetails)
# handle temporary cache support of audio stream
if streamdetails.enable_cache:
return streamdetails
+async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails) -> bool:
+ """Check if caching is allowed for the given streamdetails."""
+ if streamdetails.media_type not in (
+ MediaType.TRACK,
+ MediaType.AUDIOBOOK,
+ MediaType.PODCAST_EPISODE,
+ ):
+ return False
+ if streamdetails.stream_type in (StreamType.ICY, StreamType.LOCAL_FILE, StreamType.UNKNOWN):
+ return False
+ allow_cache = mass.config.get_raw_core_config_value(
+ "streams", CONF_ALLOW_AUDIO_CACHE, DEFAULT_ALLOW_AUDIO_CACHE
+ )
+ if allow_cache == "disabled":
+ return False
+ if not await has_enough_space(mass.streams.audio_cache_dir, REQUIRED_FREE_CACHE_SPACE):
+ return False
+ if allow_cache == "always":
+ return True
+ # auto mode
+ if streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
+ # always prefer cache for encrypted streams
+ return True
+ if not streamdetails.duration:
+ # we can't determine filesize without duration so play it safe and dont allow cache
+ return False
+ if streamdetails.stream_type == StreamType.MULTI_FILE:
+ # prefer cache to speedup multi-file streams
+ # (if total filesize smaller than 5GB)
+ max_filesize = 5 * 1024 * 1024 * 1024
+ return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
+ if streamdetails.stream_type == StreamType.CUSTOM:
+ # prefer cache for custom streams (to speedup seeking)
+ # (if total filesize smaller than 500MB)
+ max_filesize = 500 * 1024 * 1024
+ return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
+ if streamdetails.stream_type == StreamType.HLS:
+ # prefer cache for HLS streams (to speedup seeking)
+ # (if total filesize smaller than 500MB)
+ max_filesize = 500 * 1024 * 1024
+ return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
+ # deny for all other stream types
+ return False
+
+
async def get_media_stream(
mass: MusicAssistant,
streamdetails: StreamDetails,
stream_type = streamdetails.stream_type
if stream_type == StreamType.CACHE:
cache = cast(StreamCache, streamdetails.cache)
- audio_source = cache.get_audio_stream()
+ audio_source = await cache.get_audio_stream()
+ elif stream_type == StreamType.MULTI_FILE:
+ audio_source = get_multi_file_stream(mass, streamdetails)
elif stream_type == StreamType.CUSTOM:
audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
# with ffmpeg, where they just stop after some minutes,
# so we tell ffmpeg to loop around in this case.
extra_input_args += ["-stream_loop", "-1", "-re"]
+ elif stream_type == StreamType.ENCRYPTED_HTTP:
+ audio_source = streamdetails.path
+ extra_input_args += ["-decryption_key", streamdetails.decryption_key]
else:
audio_source = streamdetails.path
if bytes_sent == 0:
# edge case: no audio data was sent
raise AudioError("No audio was received")
+ elif ffmpeg_proc.returncode not in (0, None):
+ raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}")
finished = True
except (Exception, GeneratorExit) as err:
if isinstance(err, asyncio.CancelledError | GeneratorExit):
cancelled = True
raise
logger.error("Error while streaming %s: %s", streamdetails.uri, err)
+ # dump the last 10 lines of the log in case of an unclean exit
+ logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
streamdetails.stream_error = True
finally:
# always ensure close is called which also handles all cleanup
await ffmpeg_proc.close()
-
# try to determine how many seconds we've streamed
seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
- if not cancelled and ffmpeg_proc.returncode not in (0, 255):
- # dump the last 5 lines of the log in case of an unclean exit
- log_tail = "\n" + "\n".join(list(ffmpeg_proc.log_history)[-5:])
- else:
- log_tail = ""
logger.debug(
- "stream %s (with code %s) for %s - seconds streamed: %s %s",
+ "stream %s (with code %s) for %s - seconds streamed: %s",
"cancelled" if cancelled else "finished" if finished else "aborted",
ffmpeg_proc.returncode,
streamdetails.uri,
seconds_streamed,
- log_tail,
)
-
streamdetails.seconds_streamed = seconds_streamed
# store accurate duration
if finished and not streamdetails.seek_position and seconds_streamed:
yield data
+async def get_multi_file_stream(
+ mass: MusicAssistant, # noqa: ARG001
+ streamdetails: StreamDetails,
+) -> AsyncGenerator[bytes, None]:
+ """Return audio stream for a concatenation of multiple files."""
+ files_list: list[str] = streamdetails.data
+ # concat input files
+ temp_file = f"/tmp/{shortuuid.random(20)}.txt" # noqa: S108
+ async with aiofiles.open(temp_file, "w") as f:
+ for path in files_list:
+ await f.write(f"file '{path}'\n")
+
+ try:
+ async for chunk in get_ffmpeg_stream(
+ audio_input=temp_file,
+ input_format=streamdetails.audio_format,
+ output_format=AudioFormat(
+ content_type=ContentType.NUT,
+ sample_rate=streamdetails.audio_format.sample_rate,
+ bit_depth=streamdetails.audio_format.bit_depth,
+ channels=streamdetails.audio_format.channels,
+ ),
+ extra_input_args=["-safe", "0", "-f", "concat", "-i", temp_file],
+ ):
+ yield chunk
+ finally:
+ await remove_file(temp_file)
+
+
async def get_preview_stream(
mass: MusicAssistant,
provider_instance_id_or_domain: str,
return pcm_size
if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
return pcm_size
- if fmt.bit_rate:
+ if fmt.bit_rate and fmt.bit_rate < 10000:
return int(((fmt.bit_rate * 1000) / 8) * seconds)
if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
# assume 74.7% compression ratio (level 0)
]
if streamdetails.stream_type == StreamType.CACHE:
cache = cast(StreamCache, streamdetails.cache)
- audio_source = cache.get_audio_stream()
+ audio_source = await cache.get_audio_stream()
+ elif streamdetails.stream_type == StreamType.MULTI_FILE:
+ audio_source = get_multi_file_stream(mass, streamdetails)
elif streamdetails.stream_type == StreamType.CUSTOM:
audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
self._stdin_task: asyncio.Task | None = None
self._logger_task: asyncio.Task | None = None
self._input_codec_parsed = False
+ if audio_input == "-" or isinstance(audio_input, AsyncGenerator):
+ stdin = True
+ else:
+ stdin = audio_input if isinstance(audio_input, int) else False
+ stdout = audio_output if isinstance(audio_output, int) else bool(audio_output == "-")
super().__init__(
ffmpeg_args,
- stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
- stdout=True if isinstance(audio_output, str) else audio_output,
+ stdin=stdin,
+ stdout=stdout,
stderr=True,
)
self.logger = LOGGER
filter_params=filter_params,
extra_args=extra_args,
extra_input_args=extra_input_args,
+ collect_log_history=True,
) as ffmpeg_proc:
# read final chunks from stdout
iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
async for chunk in iterator:
yield chunk
+ if ffmpeg_proc.returncode not in (None, 0):
+ # dump the last 5 lines of the log in case of an unclean exit
+ log_tail = "\n" + "\n".join(list(ffmpeg_proc.log_history)[-5:])
+ ffmpeg_proc.logger.error(log_tail)
def get_ffmpeg_args( # noqa: PLR0915
]
# collect input args
input_args = []
-
if extra_input_args:
input_args += extra_input_args
if input_path.startswith("http"):
]
elif input_format.codec_type != ContentType.UNKNOWN:
input_args += ["-acodec", input_format.codec_type.name.lower(), "-i", input_path]
+ elif "-f" in extra_input_args:
+ # input format is already specified in the extra input args
+ pass
else:
# let ffmpeg auto detect the content type from the metadata/headers
input_args += ["-i", input_path]
"-f",
output_format.content_type.value,
]
- elif input_format == output_format and not filter_params and not extra_args:
- # passthrough-mode (e.g. for creating the cache)
- if output_format.content_type in (
- ContentType.MP4,
- ContentType.MP4A,
- ContentType.M4A,
- ContentType.M4B,
- ):
- fmt = "adts"
- elif output_format.codec_type in (ContentType.UNKNOWN, ContentType.OGG):
- fmt = "nut" # use special nut container
- else:
- fmt = output_format.content_type.name.lower()
+ elif output_format.content_type == ContentType.NUT:
+ # passthrough-mode (for creating the cache) using NUT container
output_args = [
"-vn",
"-dn",
"-acodec",
"copy",
"-f",
- fmt,
+ "nut",
]
elif output_format.content_type == ContentType.AAC:
output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k"]
"-compression_level",
"0",
]
-
else:
raise RuntimeError("Invalid/unsupported output format specified")
return await asyncio.to_thread(socket.inet_pton, socket.AF_INET6, ip_string)
-def get_folder_size(folderpath: str) -> float:
+async def get_folder_size(folderpath: str) -> float:
"""Return folder size in gb."""
- total_size = 0
- for dirpath, _dirnames, filenames in os.walk(folderpath):
- for _file in filenames:
- _fp = os.path.join(dirpath, _file)
- total_size += os.path.getsize(_fp)
- return total_size / float(1 << 30)
+
+ def _get_folder_size(folderpath: str) -> float:
+ total_size = 0
+ for dirpath, _dirnames, filenames in os.walk(folderpath):
+ for _file in filenames:
+ _fp = os.path.join(dirpath, _file)
+ total_size += os.path.getsize(_fp)
+ return total_size / float(1 << 30)
+
+ return await asyncio.to_thread(_get_folder_size, folderpath)
+
+
+async def clean_old_files(folderpath: str, max_size: float) -> None:
+ """Clean old files in folder to make room for new files."""
+ foldersize = await get_folder_size(folderpath)
+ if foldersize < max_size:
+ return
+
+ def _clean_old_files(foldersize: float):
+ files: list[os.DirEntry] = [x for x in os.scandir(folderpath) if x.is_file()]
+ files.sort(key=lambda x: x.stat().st_mtime)
+ for _file in files:
+ foldersize -= _file.stat().st_size / float(1 << 30)
+ os.remove(_file.path)
+ if foldersize < max_size:
+ return
+
+ await asyncio.to_thread(_clean_old_files, foldersize)
def get_changed_keys(
return False
-async def get_tmp_free_space() -> int:
- """Return free space on tmp."""
+async def get_tmp_free_space() -> float:
+ """Return free space on tmp in GB's."""
+ return await get_free_space("/tmp") # noqa: S108
+
+
+async def get_free_space(folder: str) -> float:
+ """Return free space on given folderpath in GB."""
try:
- if res := await asyncio.to_thread(shutil.disk_usage, "/tmp"): # noqa: S108
- return res.free
+ if res := await asyncio.to_thread(shutil.disk_usage, folder):
+ return res.free / float(1 << 30)
except (FileNotFoundError, OSError, PermissionError):
- return 0
+ return 0.0
+
+
+async def has_enough_space(folder: str, size: int) -> bool:
+ """Check if folder has enough free space."""
+ return await get_free_space(folder) > size
def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]:
)
from music_assistant_models.streamdetails import StreamDetails
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.models.music_provider import MusicProvider
from music_assistant.providers.audiobookshelf.parsers import (
parse_audiobook,
base_url = str(self.config.get_value(CONF_URL))
if len(tracks) == 0:
raise MediaNotFoundError("Stream not found")
+
+ content_type = ContentType.UNKNOWN
+ if abs_audiobook.media.tracks[0].metadata is not None:
+ content_type = ContentType.try_parse(abs_audiobook.media.tracks[0].metadata.ext)
+
if len(tracks) > 1:
self.logger.debug("Using playback for multiple file audiobook.")
- multiple_files = []
+ multiple_files: list[str] = []
for track in tracks:
- media_url = track.content_url
- stream_url = f"{base_url}{media_url}?token={token}"
- content_type = ContentType.UNKNOWN
- if track.metadata is not None:
- content_type = ContentType.try_parse(track.metadata.ext)
- multiple_files.append(
- (AudioFormat(content_type=content_type), stream_url, track.duration)
- )
+ stream_url = f"{base_url}{track.content_url}?token={token}"
+ multiple_files.append(stream_url)
return StreamDetails(
provider=self.instance_id,
item_id=abs_audiobook.id_,
- # for the concatanated stream, we need to use a pcm stream format
- audio_format=AudioFormat(
- content_type=ContentType.PCM_S16LE,
- sample_rate=44100,
- bit_depth=16,
- channels=2,
- ),
+ audio_format=AudioFormat(content_type=content_type),
media_type=MediaType.AUDIOBOOK,
- stream_type=StreamType.CUSTOM,
+ stream_type=StreamType.MULTI_FILE,
duration=int(abs_audiobook.media.duration),
data=multiple_files,
allow_seek=True,
- can_seek=True,
)
self.logger.debug(
f'Using direct playback for audiobook "{abs_audiobook.media.metadata.title}".'
)
-
- track = abs_audiobook.media.tracks[0]
- media_url = track.content_url
+ media_url = abs_audiobook.media.tracks[0].content_url
stream_url = f"{base_url}{media_url}?token={token}"
- content_type = ContentType.UNKNOWN
- if track.metadata is not None:
- content_type = ContentType.try_parse(track.metadata.ext)
+
return StreamDetails(
provider=self.lookup_key,
item_id=abs_audiobook.id_,
allow_seek=True,
)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """
- Return the (custom) audio stream for the provider item.
-
- Only used for multi-file audiobooks.
- """
- stream_data: list[tuple[AudioFormat, str, float]] = streamdetails.data
- total_duration = 0.0
- for audio_format, chapter_file, chapter_duration in stream_data:
- total_duration += chapter_duration
- if total_duration < seek_position:
- continue
- seek_position_netto = round(
- max(0, seek_position - (total_duration - chapter_duration)), 2
- )
- self.logger.debug(chapter_file)
- async for chunk in get_ffmpeg_stream(
- chapter_file,
- input_format=audio_format,
- # output format is always pcm because we are sending
- # the result of multiple files as one big stream
- output_format=streamdetails.audio_format,
- extra_input_args=["-ss", str(seek_position_netto)] if seek_position_netto else [],
- ):
- yield chunk
-
async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
"""Return finished:bool, position_ms: int."""
progress: None | MediaProgress = None
VARIOUS_ARTISTS_NAME,
)
from music_assistant.helpers.compare import compare_strings, create_safe_string
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.helpers.json import json_loads
from music_assistant.helpers.playlists import parse_m3u, parse_pls
from music_assistant.helpers.tags import AudioTags, async_parse_tags, parse_tags, split_items
return await self._get_stream_details_for_podcast_episode(item_id)
return await self._get_stream_details_for_track(item_id)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """
- Return the (custom) audio stream for the provider item.
-
- Will only be called when the stream_type is set to CUSTOM,
- currently only for multi-part audiobooks.
- """
- stream_data: tuple[AudioFormat, list[tuple[str, float]]] = streamdetails.data
- format_org, file_based_chapters = stream_data
- total_duration = 0.0
- for chapter_file, chapter_duration in file_based_chapters:
- total_duration += chapter_duration
- if total_duration < seek_position:
- continue
- seek_position_netto = round(
- max(0, seek_position - (total_duration - chapter_duration)), 2
- )
- async for chunk in get_ffmpeg_stream(
- self.get_absolute_path(chapter_file),
- input_format=format_org,
- # output format is always pcm because we are sending
- # the result of multiple files as one big stream
- output_format=streamdetails.audio_format,
- extra_input_args=["-ss", str(seek_position_netto)] if seek_position_netto else [],
- ):
- yield chunk
-
async def resolve_image(self, path: str) -> str | bytes:
"""
Resolve an image from an image path.
prov_mapping = next(x for x in library_item.provider_mappings if x.item_id == item_id)
file_item = await self.resolve(item_id)
-
- file_based_chapters: list[tuple[str, float]] | None
- if file_based_chapters := await self.cache.get(
+ duration = library_item.duration
+ chapters_cache_key = f"{self.lookup_key}.audiobook.chapters"
+ file_based_chapters: list[tuple[str, float]] | None = await self.cache.get(
file_item.relative_path,
- base_key=f"{self.lookup_key}.audiobook.chapters",
- ):
- # this is a multi-file audiobook, we have the chapter(files) stored in cache
- # use custom stream to simply send the chapter files one by one
+ base_key=chapters_cache_key,
+ )
+ if file_based_chapters is None:
+ # no cache available for this audiobook, we need to parse the chapters
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ await self._parse_audiobook(file_item, tags)
+ file_based_chapters = await self.cache.get(
+ file_item.relative_path,
+ base_key=chapters_cache_key,
+ )
+
+ if file_based_chapters:
+ # this is a multi-file audiobook
return StreamDetails(
provider=self.instance_id,
item_id=item_id,
- # for the concatanated stream, we need to use a pcm stream format
- audio_format=AudioFormat(
- content_type=ContentType.from_bit_depth(prov_mapping.audio_format.bit_depth),
- sample_rate=prov_mapping.audio_format.sample_rate,
- channels=prov_mapping.audio_format.channels,
- ),
+ audio_format=prov_mapping.audio_format,
media_type=MediaType.AUDIOBOOK,
- stream_type=StreamType.CUSTOM,
- duration=library_item.duration,
- data=(prov_mapping.audio_format, file_based_chapters),
+ stream_type=StreamType.MULTI_FILE,
+ duration=duration,
+ data=[self.get_absolute_path(x[0]) for x in file_based_chapters],
allow_seek=True,
- can_seek=True,
)
# regular single-file streaming, simply let ffmpeg deal with the file directly
) -> tuple[int, list[MediaItemChapter]]:
"""Return the chapters for an audiobook."""
chapters: list[MediaItemChapter] = []
+ all_chapter_files: list[tuple[str, float]] = []
+ total_duration = 0.0
if tags.chapters:
- # The chapters are embedded in the file
+ # The chapters are embedded in the file tags
chapters = [
MediaItemChapter(
position=chapter.chapter_id,
)
for chapter in tags.chapters
]
- return (try_parse_int(tags.duration) or 0, chapters)
- # there could be multiple files for this audiobook in the same folder,
- # where each file is a portion/chapter of the audiobook
- # try to gather the chapters by traversing files in the same folder
- chapter_file_tags: list[AudioTags] = []
- total_duration = 0.0
- abs_path = self.get_absolute_path(audiobook_file_item.parent_path)
- for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=True):
- if "." not in item.relative_path or item.is_dir:
- continue
- if item.ext not in AUDIOBOOK_EXTENSIONS:
- continue
- item_tags = await async_parse_tags(item.absolute_path, item.file_size)
- if not (tags.album == item_tags.album or (item_tags.tags.get("title") is None)):
- continue
- if item_tags.track is None:
- continue
- chapter_file_tags.append(item_tags)
- chapter_file_tags.sort(key=lambda x: x.track or 0)
- all_chapter_files: list[tuple[str, float]] = []
- for chapter_tags in chapter_file_tags:
- assert chapter_tags.duration is not None
- chapters.append(
- MediaItemChapter(
- position=chapter_tags.track or 0,
- name=chapter_tags.title,
- start=total_duration,
- end=total_duration + chapter_tags.duration,
+ total_duration = try_parse_int(tags.duration) or 0
+ else:
+ # there could be multiple files for this audiobook in the same folder,
+ # where each file is a portion/chapter of the audiobook
+ # try to gather the chapters by traversing files in the same folder
+ chapter_file_tags: list[AudioTags] = []
+ abs_path = self.get_absolute_path(audiobook_file_item.parent_path)
+ for item in await asyncio.to_thread(
+ sorted_scandir, self.base_path, abs_path, sort=True
+ ):
+ if "." not in item.relative_path or item.is_dir:
+ continue
+ if item.ext not in AUDIOBOOK_EXTENSIONS:
+ continue
+ item_tags = await async_parse_tags(item.absolute_path, item.file_size)
+ if not (tags.album == item_tags.album or (item_tags.tags.get("title") is None)):
+ continue
+ if item_tags.track is None:
+ continue
+ chapter_file_tags.append(item_tags)
+ chapter_file_tags.sort(key=lambda x: x.track or 0)
+ for chapter_tags in chapter_file_tags:
+ assert chapter_tags.duration is not None
+ chapters.append(
+ MediaItemChapter(
+ position=chapter_tags.track or 0,
+ name=chapter_tags.title,
+ start=total_duration,
+ end=total_duration + chapter_tags.duration,
+ )
)
- )
- all_chapter_files.append(
- (get_relative_path(self.base_path, chapter_tags.filename), chapter_tags.duration)
- )
- total_duration += chapter_tags.duration
+ all_chapter_files.append(
+ (
+ get_relative_path(self.base_path, chapter_tags.filename),
+ chapter_tags.duration,
+ )
+ )
+ total_duration += chapter_tags.duration
+
# store chapter files in cache
# for easy access from streamdetails
await self.cache.set(
"ifaddr==0.2.0",
"mashumaro==3.15",
"music-assistant-frontend==2.13.1",
- "music-assistant-models==1.1.35",
+ "music-assistant-models==1.1.36",
"mutagen==1.47.0",
"orjson==3.10.15",
"pillow==11.1.0",
liblistenbrainz==0.5.6
mashumaro==3.15
music-assistant-frontend==2.13.1
-music-assistant-models==1.1.35
+music-assistant-models==1.1.36
mutagen==1.47.0
orjson==3.10.15
pillow==11.1.0