from music_assistant_models.streamdetails import AudioFormat
from music_assistant.constants import (
+ CONF_ALLOW_MEMORY_CACHE,
CONF_ENTRY_OUTPUT_LIMITER,
CONF_OUTPUT_CHANNELS,
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_RADIO,
CONF_VOLUME_NORMALIZATION_TARGET,
CONF_VOLUME_NORMALIZATION_TRACKS,
+ DEFAULT_ALLOW_MEMORY_CACHE,
MASS_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
)
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, communicate
-from .util import detect_charset, get_tmp_free_space
+from .util import detect_charset
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
"""
StreamCache.
- Basic class to handle (temporary) caching of audio streams.
+ Basic class to handle (temporary) in-memory caching of audio streams.
Useful in case of slow or unreliable network connections, faster seeking,
or when the audio stream is slow itself.
-
- The cache is stored in a file on disk so ffmpeg can access it directly.
- After 1 minute of inactivity, the cache file will be removed.
-
- Because we use /tmp as the cache location, and on our systems /tmp is mounted as tmpfs,
- the cache will be stored in memory and not on the disk.
"""
- @property
- def data_complete(self) -> bool:
- """Return if the cache is complete."""
- return self._fetch_task is not None and self._fetch_task.done()
-
- async def acquire(self) -> str | AsyncGenerator[bytes, None]:
- """Acquire the cache and return the cache file path."""
- self.mass.cancel_timer(f"clear_cache_{self._temp_path}")
- if not self.data_complete and not self._first_part_received.is_set():
- # handle the situation where the cache
- # file is not created yet or already removed
- await self.create()
- self._subscribers += 1
- if self._all_data_written.is_set():
- # cache is completely written, return the path
- return self._temp_path
- return self._stream_from_cache()
-
- def release(self) -> None:
- """Release the cache file."""
- self._subscribers -= 1
- if self._subscribers == 0:
- # set a timer to remove the tempfile after 1 minute
- # if the file is accessed again within this period,
- # the timer will be cancelled
- self.mass.call_later(60, self._clear, task_id=f"clear_cache_{self._temp_path}")
-
- def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
- """Initialize the StreamCache."""
- self.mass = mass
- self.streamdetails = streamdetails
- self.logger = LOGGER.getChild("cache")
- self._temp_path = f"/tmp/{shortuuid.random(20)}" # noqa: S108
- self._fetch_task: asyncio.Task | None = None
- self._subscribers: int = 0
- self._first_part_received = asyncio.Event()
- self._all_data_written = asyncio.Event()
- self.org_path: str | None = streamdetails.path
- self.org_stream_type: StreamType | None = streamdetails.stream_type
- self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args
- streamdetails.path = self._temp_path
- streamdetails.stream_type = StreamType.CACHE
- streamdetails.can_seek = True
- streamdetails.allow_seek = True
- streamdetails.extra_input_args = []
-
async def create(self) -> None:
"""Create the cache file (if needed)."""
- self.mass.cancel_timer(f"clear_cache_{self._temp_path}")
- if await asyncio.to_thread(os.path.exists, self._temp_path):
- return
- if self._fetch_task is not None and not self._fetch_task.done():
- # fetch task is already busy
- return
- self._fetch_task = self.mass.create_task(self._create_cache_file())
+ self.mass.cancel_timer(f"clear_cache_{self.cache_id}")
+ if self._fetch_task is None:
+ self._fetch_task = self.mass.create_task(self._fill_cache())
# wait until the first part of the file is received
await self._first_part_received.wait()
- async def _create_cache_file(self) -> None:
+ async def get_audio_stream(self) -> AsyncGenerator[bytes, None]:
+ """Stream audio from cachedata (while it might even still being written)."""
+ try:
+ self._subscribers += 1
+ bytes_read = 0
+ chunksize = 64000
+ await self.create()
+ while True:
+ async with self._lock:
+ chunk = self._data[bytes_read : bytes_read + chunksize]
+ bytes_read += len(chunk)
+ if len(chunk) < chunksize and self._all_data_written.is_set():
+ # reached EOF
+ break
+ elif not chunk:
+ # data is not yet available, wait a bit
+ await asyncio.sleep(0.05)
+ else:
+ yield chunk
+ del chunk
+ finally:
+ self._subscribers -= 1
+ if self._subscribers == 0:
+ # set a timer to remove the tempfile after 1 minute
+ # if the file is accessed again within this period,
+ # the timer will be cancelled
+ self.mass.call_later(60, self._clear, task_id=f"clear_cache_{self.cache_id}")
+
+ async def _fill_cache(self) -> None:
time_start = time.time()
- self.logger.debug(
- "Fetching audio stream for %s",
- self.streamdetails.uri,
- )
+ self.logger.debug("Fetching audio stream for %s", self.streamdetails.uri)
if self.org_stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
self.streamdetails,
# ffmpeg will produce a lossless copy of the original codec to stdout.
self._first_part_received.clear()
self._all_data_written.clear()
- required_bytes = get_chunksize(self.streamdetails.audio_format, 2)
- async with FFMpeg(
+ self._data = b""
+ async for chunk in get_ffmpeg_stream(
audio_input=audio_source,
input_format=self.streamdetails.audio_format,
output_format=self.streamdetails.audio_format,
extra_input_args=extra_input_args,
- audio_output=self._temp_path,
- ) as ffmpeg_proc:
- # wait until the first part of the file is received
- while ffmpeg_proc.returncode is None:
- await asyncio.sleep(0.05)
- if not await asyncio.to_thread(os.path.exists, self._temp_path):
- continue
- if await asyncio.to_thread(os.path.getsize, self._temp_path) >= required_bytes:
- break
- self._first_part_received.set()
- self.logger.debug(
- "First part received for %s after %.2fs",
- self.streamdetails.uri,
- time.time() - time_start,
- )
- # wait until ffmpeg is done
- await ffmpeg_proc.wait()
- self._all_data_written.set()
-
- LOGGER.debug(
+ ):
+ async with self._lock:
+ self._data += chunk
+ del chunk
+ if not self._first_part_received.is_set():
+ self._first_part_received.set()
+ self.logger.debug(
+ "First part received for %s after %.2fs",
+ self.streamdetails.uri,
+ time.time() - time_start,
+ )
+ self._all_data_written.set()
+ self.logger.debug(
"Writing all data for %s done in %.2fs",
self.streamdetails.uri,
time.time() - time_start,
)
- async def _stream_from_cache(self) -> AsyncGenerator[bytes, None]:
- """Stream audio from cachefile (while its still being written)."""
- async with aiofiles.open(self._temp_path, "rb", buffering=0) as _file:
- while True:
- chunk = await _file.read(64000)
- if not chunk and self._all_data_written.is_set():
- break
- elif not chunk:
- await asyncio.sleep(0.05)
- else:
- yield chunk
+ def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
+ """Initialize the StreamCache."""
+ self.mass = mass
+ self.streamdetails = streamdetails
+ self.cache_id = shortuuid.random(20)
+ self.logger = LOGGER.getChild("cache")
+ self._fetch_task: asyncio.Task | None = None
+ self._subscribers: int = 0
+ self._first_part_received = asyncio.Event()
+ self._all_data_written = asyncio.Event()
+ self._data: bytes = b""
+ self._lock: asyncio.Lock = asyncio.Lock()
+ self.org_path: str | None = streamdetails.path
+ self.org_stream_type: StreamType | None = streamdetails.stream_type
+ self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args
+ streamdetails.path = "-"
+ streamdetails.stream_type = StreamType.CACHE
+ streamdetails.can_seek = True
+ streamdetails.allow_seek = True
+ streamdetails.extra_input_args = []
async def _clear(self) -> None:
"""Clear the cache."""
+ self.logger.debug("Cleaning up cache %s", self.streamdetails.uri)
+ if self._fetch_task and not self._fetch_task.done():
+ self._fetch_task.cancel()
+ self._fetch_task = None
self._first_part_received.clear()
self._all_data_written.clear()
- self._fetch_task = None
- await remove_file(self._temp_path)
+ del self._data
+ self._data = b""
def __del__(self) -> None:
- """Ensure the temp file gets cleaned up."""
+ """Ensure the cache data gets cleaned up."""
if self.mass.closing:
- # edge case: MA is closing, clean down the file immediately
- if os.path.isfile(self._temp_path):
- os.remove(self._temp_path)
+ # edge case: MA is closing
return
- self.mass.loop.call_soon_threadsafe(self.mass.create_task, remove_file(self._temp_path))
- self.mass.cancel_timer(f"remove_file_{self._temp_path}")
+ self.mass.cancel_timer(f"remove_file_{self.cache_id}")
+ del self._data
async def crossfade_pcm_parts(
else:
break
else:
- raise MediaNotFoundError(
- f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
- )
+ msg = f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
+ raise MediaNotFoundError(msg)
# work out how to handle radio stream
if (
)
if streamdetails.decryption_key:
- # using intermediate cache is mandatory for decryption
+ # using intermediate cache is mandatory for encrypted streams
streamdetails.enable_cache = True
- # determine if we may use a temporary cache for the audio stream
+ # determine if we may use caching for the audio stream
if streamdetails.enable_cache is None:
+ allow_cache = mass.config.get_raw_core_config_value(
+ "streams", CONF_ALLOW_MEMORY_CACHE, DEFAULT_ALLOW_MEMORY_CACHE
+ )
streamdetails.enable_cache = (
- streamdetails.duration is not None
+ allow_cache
+ and streamdetails.duration is not None
and streamdetails.media_type
in (MediaType.TRACK, MediaType.AUDIOBOOK, MediaType.PODCAST_EPISODE)
and streamdetails.stream_type
in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.CUSTOM, StreamType.HLS)
and streamdetails.audio_format.content_type != ContentType.UNKNOWN
- and await get_tmp_free_space() > 512 * 1024 * 1024
and get_chunksize(streamdetails.audio_format, streamdetails.duration) < 100000000
)
stream_type = streamdetails.stream_type
if stream_type == StreamType.CACHE:
cache = cast(StreamCache, streamdetails.cache)
- audio_source = await cache.acquire()
+ audio_source = cache.get_audio_stream()
elif stream_type == StreamType.CUSTOM:
audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
- seek_position=streamdetails.seek_position,
+ seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
)
elif stream_type == StreamType.ICY:
audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
and streamdetails.allow_seek
# allow seeking for custom streams,
# but only for custom streams that can't seek theirselves
- and (stream_type != StreamType.CUSTOM or not streamdetails.can_seek)
+ and not (stream_type == StreamType.CUSTOM and streamdetails.can_seek)
):
extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
):
mass.create_task(music_prov.on_streamed(streamdetails))
- # release cache file
- if streamdetails.stream_type == StreamType.CACHE:
- cache = cast(StreamCache, streamdetails.cache)
- cache.release()
-
def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None):
"""Generate a wave header from given params."""
]
if streamdetails.stream_type == StreamType.CACHE:
cache = cast(StreamCache, streamdetails.cache)
- audio_source = await cache.acquire()
+ audio_source = cache.get_audio_stream()
elif streamdetails.stream_type == StreamType.CUSTOM:
audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
streamdetails.uri,
loudness,
)
- # release cache file
- if streamdetails.stream_type == StreamType.CACHE:
- cache = cast(StreamCache, streamdetails.cache)
- cache.release()
def _get_normalization_mode(