MusicAssistantError,
ProviderUnavailableError,
)
-from music_assistant_models.helpers import get_global_cache_value, set_global_cache_values
from music_assistant_models.streamdetails import AudioFormat
from music_assistant.constants import (
from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
from music_assistant.helpers.util import clean_stream_title
+from .datetime import utc
from .dsp import filter_to_ffmpeg_params
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, communicate
-from .util import detect_charset, has_tmpfs_mount
+from .util import detect_charset, get_tmp_free_space
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
"""
StreamCache.
- Basic class to handle temporary caching of audio streams.
- For now, based on a (in-memory) tempfile and ffmpeg.
+ Basic class to handle (temporary) in-memory caching of audio streams.
+ Useful in case of slow or unreliable network connections, faster seeking,
+ or when the audio stream is slow itself.
"""
- def acquire(self) -> str:
+ @property
+ def data_complete(self) -> bool:
+ """Return if the cache is complete."""
+ return self._fetch_task is not None and self._fetch_task.done()
+
+ async def acquire(self) -> str | AsyncGenerator[bytes, None]:
"""Acquire the cache and return the cache file path."""
- # for the edge case where the cache file is not released,
- # set a fallback timer to remove the file after 20 minutes
- self.mass.call_later(
- 20 * 60, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}"
- )
- return self._temp_path
+ self.mass.cancel_timer(f"clear_cache_{self._temp_path}")
+ if not self.data_complete and not self._first_part_received.is_set():
+ # handle the situation where the cache
+ # file is not created yet or already removed
+ await self.create()
+ self._subscribers += 1
+ if self._all_data_written.is_set():
+ # cache is completely written, return the path
+ return self._temp_path
+ return self._stream_from_cache()
def release(self) -> None:
"""Release the cache file."""
- # edge case: MA is closing, clean down the file immediately
- if self.mass.closing:
- os.remove(self._temp_path)
- return
- # set a timer to remove the file after 1 minute
- # if the file is accessed again within this 1 minute, the timer will be cancelled
- self.mass.call_later(
- 60, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}"
- )
+ self._subscribers -= 1
+ if self._subscribers == 0:
+ # set a timer to remove the tempfile after 1 minute
+ # if the file is accessed again within this period,
+ # the timer will be cancelled
+ self.mass.call_later(60, self._clear, task_id=f"clear_cache_{self._temp_path}")
def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
"""Initialize the StreamCache."""
self.mass = mass
self.streamdetails = streamdetails
- ext = streamdetails.audio_format.output_format_str
- self._temp_path = f"/tmp/{shortuuid.random(20)}.{ext}" # noqa: S108
+ self.logger = LOGGER.getChild("cache")
+ self._temp_path = f"/tmp/{shortuuid.random(20)}" # noqa: S108
self._fetch_task: asyncio.Task | None = None
+ self._subscribers: int = 0
+ self._first_part_received = asyncio.Event()
+ self._all_data_written = asyncio.Event()
self.org_path: str | None = streamdetails.path
self.org_stream_type: StreamType | None = streamdetails.stream_type
self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args
streamdetails.path = self._temp_path
- streamdetails.stream_type = StreamType.CACHE_FILE
+ streamdetails.stream_type = StreamType.CACHE
+ streamdetails.can_seek = True
+ streamdetails.allow_seek = True
streamdetails.extra_input_args = []
async def create(self) -> None:
"""Create the cache file (if needed)."""
+ self.mass.cancel_timer(f"clear_cache_{self._temp_path}")
if await asyncio.to_thread(os.path.exists, self._temp_path):
return
if self._fetch_task is not None and not self._fetch_task.done():
# fetch task is already busy
return
self._fetch_task = self.mass.create_task(self._create_cache_file())
- # for the edge case where the cache file is not consumed at all,
- # set a fallback timer to remove the file after 1 hour
- self.mass.call_later(
- 3600, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}"
- )
-
- async def wait(self) -> None:
- """
- Wait until the cache is ready.
-
- Optionally wait until the full file is available (e.g. when seeking).
- """
- await self._fetch_task
+ # wait until the first part of the file is received
+ await self._first_part_received.wait()
async def _create_cache_file(self) -> None:
time_start = time.time()
- LOGGER.log(VERBOSE_LOG_LEVEL, "Fetching audio stream to cache file %s", self._temp_path)
-
+ self.logger.debug(
+ "Fetching audio stream for %s",
+ self.streamdetails.uri,
+ )
if self.org_stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
self.streamdetails,
extra_input_args = self.org_extra_input_args or []
if self.streamdetails.decryption_key:
- extra_input_args += ["-decryption_key", self.streamdetails.decryption_key]
-
- ffmpeg = FFMpeg(
+ extra_input_args += [
+ "-decryption_key",
+ self.streamdetails.decryption_key,
+ ]
+
+ # we always use an intermediate ffmpeg process to fetch the original audio source
+ # this may feel a bit redundant, but it's the most reliable way to fetch the audio
+ # because ffmpeg has all logic to handle different audio formats, codecs, etc.
+ # and it also accounts for complicated cases such as encrypted streams or
+ # m4a/mp4 streams with the moov atom at the end of the file.
+ # ffmpeg will produce a lossless copy of the original codec to stdout.
+ self._first_part_received.clear()
+ self._all_data_written.clear()
+ required_bytes = get_chunksize(self.streamdetails.audio_format, 2)
+ async with FFMpeg(
audio_input=audio_source,
input_format=self.streamdetails.audio_format,
output_format=self.streamdetails.audio_format,
- extra_input_args=["-y", *extra_input_args],
+ extra_input_args=extra_input_args,
audio_output=self._temp_path,
+ ) as ffmpeg_proc:
+ # wait until the first part of the file is received
+ while ffmpeg_proc.returncode is None:
+ await asyncio.sleep(0.05)
+ if not await asyncio.to_thread(os.path.exists, self._temp_path):
+ continue
+ if await asyncio.to_thread(os.path.getsize, self._temp_path) >= required_bytes:
+ break
+ self._first_part_received.set()
+ self.logger.debug(
+ "First part received for %s after %.2fs",
+ self.streamdetails.uri,
+ time.time() - time_start,
+ )
+ # wait until ffmpeg is done
+ await ffmpeg_proc.wait()
+ self._all_data_written.set()
+
+ LOGGER.debug(
+ "Writing all data for %s done in %.2fs",
+ self.streamdetails.uri,
+ time.time() - time_start,
)
- await ffmpeg.start()
- await ffmpeg.wait()
- process_time = int((time.time() - time_start) * 1000)
- LOGGER.log(
- VERBOSE_LOG_LEVEL,
- "Writing cache file %s done in %s milliseconds",
- self._temp_path,
- process_time,
- )
+
+ async def _stream_from_cache(self) -> AsyncGenerator[bytes, None]:
+ """Stream audio from cachefile (while its still being written)."""
+ async with aiofiles.open(self._temp_path, "rb", buffering=0) as _file:
+ while True:
+ chunk = await _file.read(64000)
+ if not chunk and self._all_data_written.is_set():
+ break
+ elif not chunk:
+ await asyncio.sleep(0.05)
+ else:
+ yield chunk
+
+ async def _clear(self) -> None:
+ """Clear the cache."""
+ self._first_part_received.clear()
+ self._all_data_written.clear()
+ self._fetch_task = None
+ await remove_file(self._temp_path)
def __del__(self) -> None:
"""Ensure the temp file gets cleaned up."""
+ if self.mass.closing:
+ # edge case: MA is closing, clean down the file immediately
+ if os.path.isfile(self._temp_path):
+ os.remove(self._temp_path)
+ return
self.mass.loop.call_soon_threadsafe(self.mass.create_task, remove_file(self._temp_path))
+ self.mass.cancel_timer(f"remove_file_{self._temp_path}")
async def crossfade_pcm_parts(
seek_position: int = 0,
fade_in: bool = False,
prefer_album_loudness: bool = False,
- is_start: bool = True,
) -> StreamDetails:
"""
Get streamdetails for the given QueueItem.
raise MediaNotFoundError(
f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
)
- if queue_item.streamdetails and (
- not queue_item.streamdetails.seconds_streamed
- or queue_item.streamdetails.stream_type == StreamType.CACHE_FILE
- ):
+ if queue_item.streamdetails and (utc() - queue_item.streamdetails.created_at).seconds < 1800:
# already got a fresh/unused (or cached) streamdetails
+ # we assume that the streamdetails are valid for max 30 minutes
streamdetails = queue_item.streamdetails
else:
+ # retrieve streamdetails from provider
media_item = queue_item.media_item
# sort by quality and check item's availability
for prov_media in sorted(
f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
)
- # work out how to handle radio stream
- if (
- streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
- and streamdetails.media_type == MediaType.RADIO
- ):
- resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
- streamdetails.path = resolved_url
- streamdetails.stream_type = stream_type
+ # work out how to handle radio stream
+ if (
+ streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
+ and streamdetails.media_type == MediaType.RADIO
+ ):
+ resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
+ streamdetails.path = resolved_url
+ streamdetails.stream_type = stream_type
+
# set queue_id on the streamdetails so we know what is being streamed
streamdetails.queue_id = queue_item.queue_id
# handle skip/fade_in details
# attach the DSP details of all group members
streamdetails.dsp = get_stream_dsp_details(mass, streamdetails.queue_id)
- process_time = int((time.time() - time_start) * 1000)
LOGGER.debug(
"retrieved streamdetails for %s in %s milliseconds",
queue_item.uri,
- process_time,
+ int((time.time() - time_start) * 1000),
)
if streamdetails.decryption_key:
# determine if we may use a temporary cache for the audio stream
if streamdetails.enable_cache is None:
- tmpfs_present = get_global_cache_value("tmpfs_present")
- if tmpfs_present is None:
- tmpfs_present = await has_tmpfs_mount()
- await set_global_cache_values({"tmpfs_present": tmpfs_present})
streamdetails.enable_cache = (
- not is_start
- and tmpfs_present
- and streamdetails.duration is not None
- and streamdetails.duration < 1800
+ streamdetails.duration is not None
+ and streamdetails.media_type
+ in (MediaType.TRACK, MediaType.AUDIOBOOK, MediaType.PODCAST_EPISODE)
and streamdetails.stream_type
in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.CUSTOM, StreamType.HLS)
+ and streamdetails.audio_format.content_type != ContentType.UNKNOWN
+ and await get_tmp_free_space() > 512 * 1024 * 1024
+ and get_chunksize(streamdetails.audio_format, streamdetails.duration) < 100000000
)
# handle temporary cache support of audio stream
streamdetails.cache = StreamCache(mass, streamdetails)
else:
streamdetails.cache = cast(StreamCache, streamdetails.cache)
+ # create cache (if needed) and wait until the cache is available
await streamdetails.cache.create()
- # wait until the cache file is available
- await streamdetails.cache.wait()
+ LOGGER.debug(
+ "streamdetails cache ready for %s in %s milliseconds",
+ queue_item.uri,
+ int((time.time() - time_start) * 1000),
+ )
return streamdetails
mass: MusicAssistant,
streamdetails: StreamDetails,
pcm_format: AudioFormat,
- audio_source: AsyncGenerator[bytes, None] | str,
filter_params: list[str] | None = None,
- extra_input_args: list[str] | None = None,
) -> AsyncGenerator[bytes, None]:
"""Get PCM audio stream for given media details."""
logger = LOGGER.getChild("media_stream")
logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
+ extra_input_args = streamdetails.extra_input_args or []
strip_silence_begin = streamdetails.strip_silence_begin
strip_silence_end = streamdetails.strip_silence_end
if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
strip_silence_begin = False
- if streamdetails.stream_type == StreamType.CACHE_FILE:
+ # work out audio source for these streamdetails
+ stream_type = streamdetails.stream_type
+ if stream_type == StreamType.CACHE:
cache = cast(StreamCache, streamdetails.cache)
- audio_source = cache.acquire()
+ audio_source = await cache.acquire()
+ elif stream_type == StreamType.CUSTOM:
+ audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
+ streamdetails,
+ seek_position=streamdetails.seek_position,
+ )
+ elif stream_type == StreamType.ICY:
+ audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
+ elif stream_type == StreamType.HLS:
+ substream = await get_hls_substream(mass, streamdetails.path)
+ audio_source = substream.path
+ if streamdetails.media_type == MediaType.RADIO:
+ # Especially the BBC streams struggle when they're played directly
+ # with ffmpeg, where they just stop after some minutes,
+ # so we tell ffmpeg to loop around in this case.
+ extra_input_args += ["-stream_loop", "-1", "-re"]
+ else:
+ audio_source = streamdetails.path
+
+ # handle seek support
+ if (
+ streamdetails.seek_position
+ and streamdetails.duration
+ and streamdetails.allow_seek
+ # allow seeking for custom streams,
+ # but only for custom streams that can't seek theirselves
+ and (stream_type != StreamType.CUSTOM or not streamdetails.can_seek)
+ ):
+ extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
bytes_sent = 0
chunk_number = 0
req_buffer_size = int(pcm_format.pcm_sample_size * 5)
elif chunk_number > 240 and strip_silence_end:
req_buffer_size = int(pcm_format.pcm_sample_size * 10)
- elif chunk_number > 60 and strip_silence_end:
+ elif chunk_number > 120 and strip_silence_end:
req_buffer_size = int(pcm_format.pcm_sample_size * 8)
- elif chunk_number > 30:
+ elif chunk_number > 60:
+ req_buffer_size = int(pcm_format.pcm_sample_size * 6)
+ elif chunk_number > 20 and strip_silence_end:
req_buffer_size = int(pcm_format.pcm_sample_size * 4)
- elif chunk_number > 10 and strip_silence_end:
- req_buffer_size = int(pcm_format.pcm_sample_size * 2)
else:
- req_buffer_size = pcm_format.pcm_sample_size
+ req_buffer_size = pcm_format.pcm_sample_size * 2
# always append to buffer
buffer += chunk
# buffer is not full enough, move on
continue
- if chunk_number == 5 and strip_silence_begin:
+ if strip_silence_begin:
# strip silence from begin of audio
+ strip_silence_begin = False
chunk = await strip_silence( # noqa: PLW2901
mass, buffer, pcm_format=pcm_format
)
):
mass.create_task(music_prov.on_streamed(streamdetails))
- # schedule removal of cache file
- if streamdetails.stream_type == StreamType.CACHE_FILE:
+ # release cache file
+ if streamdetails.stream_type == StreamType.CACHE:
cache = cast(StreamCache, streamdetails.cache)
cache.release()
"-t",
"600",
]
- if streamdetails.stream_type == StreamType.CUSTOM:
+ if streamdetails.stream_type == StreamType.CACHE:
+ cache = cast(StreamCache, streamdetails.cache)
+ audio_source = await cache.acquire()
+ elif streamdetails.stream_type == StreamType.CUSTOM:
audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
)
streamdetails.uri,
loudness,
)
+ # release cache file
+ if streamdetails.stream_type == StreamType.CACHE:
+ cache = cast(StreamCache, streamdetails.cache)
+ cache.release()
def _get_normalization_mode(
clean_args.append("<URL>")
elif "/" in arg and "." in arg:
clean_args.append("<FILE>")
+ elif arg.startswith("data:application/"):
+ clean_args.append("<DATA>")
else:
clean_args.append(arg)
args_str = " ".join(clean_args)
if self.collect_log_history:
self.log_history.append(line)
if "error" in line or "warning" in line:
- self.logger.debug(line)
- elif "critical" in line:
self.logger.warning(line)
+ elif "critical" in line:
+ self.logger.error(line)
else:
self.logger.log(VERBOSE_LOG_LEVEL, line)
try:
start = time.time()
self.logger.debug("Start reading audio data from source...")
- # use TimedAsyncGenerator to catch we're stuck waiting on data forever
- # don't set this timeout too low because in some cases it can indeed take a while
- # for data to arrive (e.g. when there is X amount of seconds in the buffer)
- # so this timeout is just to catch if the source is stuck and rpeort it and not
- # to recover from it.
- # async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300):
- # if self.closed:
- # return
- # await self.write(chunk)
async for chunk in self.audio_input:
if self.closed:
return
extra_args: list[str] | None = None,
chunk_size: int | None = None,
extra_input_args: list[str] | None = None,
+ collect_log_history: bool = False,
+ loglevel: str = "info",
) -> AsyncGenerator[bytes, None]:
"""
Get the ffmpeg audio stream as async generator.
filter_params=filter_params,
extra_args=extra_args,
extra_input_args=extra_input_args,
+ collect_log_history=collect_log_history,
+ loglevel=loglevel,
) as ffmpeg_proc:
# read final chunks from stdout
iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
yield chunk
-def get_ffmpeg_args(
+def get_ffmpeg_args( # noqa: PLR0915
input_format: AudioFormat,
output_format: AudioFormat,
filter_params: list[str],
"-ignore_unknown",
"-protocol_whitelist",
"file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp,concat",
- "-probesize",
- "8192",
]
# collect input args
input_args = []
"-i",
input_path,
]
+ elif input_format.codec_type != ContentType.UNKNOWN:
+ input_args += ["-acodec", input_format.codec_type.name.lower(), "-i", input_path]
else:
# let ffmpeg auto detect the content type from the metadata/headers
input_args += ["-i", input_path]
# devnull stream
output_path = "-"
output_args = ["-f", "null"]
+ elif output_format.content_type.is_pcm():
+ # use explicit format identifier for pcm formats
+ output_args += [
+ "-ar",
+ str(output_format.sample_rate),
+ "-acodec",
+ output_format.content_type.name.lower(),
+ "-f",
+ output_format.content_type.value,
+ ]
+ elif input_format == output_format:
+ # passthrough
+ if output_format.content_type in (
+ ContentType.MP4,
+ ContentType.MP4A,
+ ContentType.M4A,
+ ContentType.M4B,
+ ):
+ fmt = "adts"
+ elif output_format.codec_type != ContentType.UNKNOWN:
+ fmt = output_format.codec_type.name.lower()
+ else:
+ fmt = output_format.content_type.name.lower()
+ output_args = [
+ "-vn",
+ "-dn",
+ "-sn",
+ "-acodec",
+ "copy",
+ "-f",
+ fmt,
+ ]
elif output_format.content_type == ContentType.AAC:
output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k"]
elif output_format.content_type == ContentType.MP3:
"-compression_level",
"0",
]
- elif output_format.content_type.is_pcm():
- # use explicit format identifier for pcm formats
- output_args += [
- "-ar",
- str(output_format.sample_rate),
- "-acodec",
- output_format.content_type.name.lower(),
- "-f",
- output_format.content_type.value,
- ]
- elif input_format == output_format:
- # passthrough
- output_args = ["-c", "copy"]
+
else:
raise RuntimeError("Invalid/unsupported output format specified")