category="advanced",
required=False,
),
- # ConfigEntry(
- # key=CONF_ALLOW_AUDIO_CACHE,
- # type=ConfigEntryType.STRING,
- # default_value=self.allow_cache_default,
- # options=[
- # ConfigValueOption("Always", "always"),
- # ConfigValueOption("Disabled", "disabled"),
- # ConfigValueOption("Auto", "auto"),
- # ],
- # label="Allow caching of remote/cloudbased audio streams",
- # description="To ensure smooth(er) playback as well as fast seeking, "
- # "Music Assistant can cache audio streams on disk. \n"
- # "On systems with limited diskspace, this can be disabled, "
- # "but may result in less smooth playback or slower seeking.\n\n"
- # "**Always:** Enforce caching of audio streams at all times "
- # "(as long as there is enough free space).\n"
- # "**Disabled:** Never cache audio streams.\n"
- # "**Auto:** Let Music Assistant decide if caching "
- # "should be used on a per-item base.",
- # category="advanced",
- # required=True,
- # ),
)
async def setup(self, config: CoreConfig) -> None:
if request.method != "GET":
return resp
- # work out pcm format based on output format
- pcm_format = AudioFormat(
- content_type=DEFAULT_PCM_FORMAT.content_type,
- sample_rate=output_format.sample_rate,
- # always use f32 internally for extra headroom for filters etc
- bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
- channels=2,
- )
- smart_fades_mode = await self.mass.config.get_player_config_value(
- queue.queue_id, CONF_SMART_FADES_MODE
- )
- standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
- queue.queue_id, CONF_CROSSFADE_DURATION, 10
- )
+ if queue_item.media_type != MediaType.TRACK:
+ # no crossfade on non-tracks
+ smart_fades_mode = SmartFadesMode.DISABLED
+ else:
+ smart_fades_mode = await self.mass.config.get_player_config_value(
+ queue.queue_id, CONF_SMART_FADES_MODE
+ )
+ standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
+ queue.queue_id, CONF_CROSSFADE_DURATION, 10
+ )
if (
smart_fades_mode != SmartFadesMode.DISABLED
and PlayerFeature.GAPLESS_PLAYBACK not in queue_player.supported_features
# crossfade is enabled, use special crossfaded single item stream
# where the crossfade of the next track is present in the stream of
# a single track. This only works if the player supports gapless playback.
- audio_input = self.get_queue_item_stream_with_smartfade(
- queue_item=queue_item,
- pcm_format=pcm_format,
- session_id=session_id,
- smart_fades_mode=smart_fades_mode,
- standard_crossfade_duration=standard_crossfade_duration,
+ # work out pcm format based on output format
+ pcm_format = AudioFormat(
+ content_type=DEFAULT_PCM_FORMAT.content_type,
+ sample_rate=output_format.sample_rate,
+ # always use f32 internally for extra headroom for filters etc
+ bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
+ channels=2,
+ )
+ audio_input = get_ffmpeg_stream(
+ audio_input=self.get_queue_item_stream_with_smartfade(
+ queue_item=queue_item,
+ pcm_format=pcm_format,
+ session_id=session_id,
+ smart_fades_mode=smart_fades_mode,
+ standard_crossfade_duration=standard_crossfade_duration,
+ ),
+ input_format=pcm_format,
+ output_format=output_format,
+ filter_params=get_player_filter_params(
+ self.mass, queue_player.player_id, pcm_format, output_format
+ ),
)
else:
+ # no crossfade, just a regular single item stream
+ # no need to convert to pcm first, request output format directly
audio_input = self.get_queue_item_stream(
queue_item=queue_item,
- pcm_format=pcm_format,
+ output_format=output_format,
+ filter_params=get_player_filter_params(
+ self.mass,
+ queue_player.player_id,
+ queue_item.streamdetails.audio_format,
+ output_format,
+ ),
)
- async for chunk in get_ffmpeg_stream(
- audio_input=audio_input,
- input_format=pcm_format,
- output_format=output_format,
- filter_params=get_player_filter_params(
- self.mass, queue_player.player_id, pcm_format, output_format
- ),
- chunk_size=get_chunksize(output_format),
- ):
+ async for chunk in audio_input:
try:
await resp.write(chunk)
except (BrokenPipeError, ConnectionResetError, ConnectionError):
pcm_sample_size = int(
pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels
)
- smart_fades_mode = await self.mass.config.get_player_config_value(
- queue.queue_id, CONF_SMART_FADES_MODE
- )
- standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
- queue.queue_id, CONF_CROSSFADE_DURATION, 10
- )
+ if start_queue_item.media_type != MediaType.TRACK:
+ # no crossfade on non-tracks
+ # NOTE that we shouldn't be using flow mode for non-tracks at all,
+ # but just to be sure, we specifically disable crossfade here
+ smart_fades_mode = SmartFadesMode.DISABLED
+ standard_crossfade_duration = 0
+ else:
+ smart_fades_mode = await self.mass.config.get_player_config_value(
+ queue.queue_id, CONF_SMART_FADES_MODE
+ )
+ standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
+ queue.queue_id, CONF_CROSSFADE_DURATION, 10
+ )
self.logger.info(
"Start Queue Flow stream for Queue %s - crossfade: %s %s",
queue.display_name,
# handle incoming audio chunks
async for chunk in self.get_queue_item_stream(
queue_track,
- pcm_format=pcm_format,
+ output_format=pcm_format,
):
# buffer size needs to be big enough to include the crossfade part
req_buffer_size = (
async def get_queue_item_stream(
self,
queue_item: QueueItem,
- pcm_format: AudioFormat,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
) -> AsyncGenerator[bytes, None]:
- """Get the audio stream for a single queue item as raw PCM audio."""
+ """Get the audio stream for a single queue item."""
# collect all arguments for ffmpeg
streamdetails = queue_item.streamdetails
assert streamdetails
- filter_params = []
+ filter_params = filter_params or []
# handle volume normalization
gain_correct: float | None = None
filter_params.append(f"volume={gain_correct}dB")
streamdetails.volume_normalization_gain_correct = gain_correct
- # if streamdetails.media_type == MediaType.RADIO or not streamdetails.duration:
- # # pad some silence before the radio/live stream starts to create some headroom
- # # for radio stations (or other live streams) that do not provide any look ahead buffer
- # # without this, some radio streams jitter a lot,
- # # especially with dynamic normalization,
- # # if the stream does not provide a look ahead buffer
- # async for silence in get_silence(4, pcm_format):
- # yield silence
- # del silence
-
first_chunk_received = False
async for chunk in get_media_stream(
self.mass,
streamdetails=streamdetails,
- pcm_format=pcm_format,
+ output_format=output_format,
filter_params=filter_params,
):
if not first_chunk_received:
ProviderUnavailableError,
)
from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.streamdetails import MultiPartPath
from music_assistant.constants import (
- CONF_ALLOW_AUDIO_CACHE,
CONF_ENTRY_OUTPUT_LIMITER,
CONF_OUTPUT_CHANNELS,
CONF_VOLUME_NORMALIZATION,
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, communicate
-from .util import detect_charset, has_enough_space
+from .util import detect_charset
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
-CACHE_CATEGORY_AUDIO_CACHE: Final[int] = 99
CACHE_CATEGORY_RESOLVED_RADIO_URL: Final[int] = 100
CACHE_PROVIDER: Final[str] = "audio"
-CACHE_FILES_IN_USE: set[str] = set()
-
-
-class StreamCache:
- """
- StreamCache.
-
- Basic class to handle caching of audio streams to a (semi) temporary file.
- Useful in case of slow or unreliable network connections, faster seeking,
- or when the audio stream is slow itself.
- """
-
- def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
- """Initialize the StreamCache."""
- self.mass = mass
- self.streamdetails = streamdetails
- self.logger = LOGGER.getChild("cache")
- self._cache_file: str | None = None
- self._fetch_task: asyncio.Task[None] | None = None
- self._subscribers: int = 0
- self._first_part_received = asyncio.Event()
- self._all_data_written: bool = False
- self._stream_error: str | None = None
- self.org_path: str | None = streamdetails.path
- self.org_stream_type: StreamType | None = streamdetails.stream_type
- self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args
- self.org_audio_format = streamdetails.audio_format
- streamdetails.audio_format = AudioFormat(
- content_type=ContentType.NUT,
- codec_type=streamdetails.audio_format.codec_type,
- sample_rate=streamdetails.audio_format.sample_rate,
- bit_depth=streamdetails.audio_format.bit_depth,
- channels=streamdetails.audio_format.channels,
- )
- streamdetails.path = "-"
- streamdetails.stream_type = StreamType.CACHE
- streamdetails.can_seek = True
- streamdetails.allow_seek = True
- streamdetails.extra_input_args = []
-
- async def create(self) -> None:
- """Create the cache file (if needed)."""
- if self._cache_file is None:
- if cached_cache_path := await self.mass.cache.get(
- key=self.streamdetails.uri,
- provider=CACHE_PROVIDER,
- category=CACHE_CATEGORY_AUDIO_CACHE,
- ):
- # we have a mapping stored for this uri, prefer that
- self._cache_file = cached_cache_path
- assert self._cache_file is not None # for type checking
- if await asyncio.to_thread(os.path.exists, self._cache_file):
- # cache file already exists from a previous session,
- # we can simply use that, there is nothing to create
- CACHE_FILES_IN_USE.add(self._cache_file)
- self._all_data_written = True
- return
- else:
- # create new cache file
- cache_id = shortuuid.random(30)
- self._cache_file = cache_file = os.path.join(
- self.mass.streams.audio_cache_dir, cache_id
- )
- await self.mass.cache.set(
- key=self.streamdetails.uri,
- data=cache_file,
- provider=CACHE_PROVIDER,
- category=CACHE_CATEGORY_AUDIO_CACHE,
- )
- # mark file as in-use to prevent it being deleted
- CACHE_FILES_IN_USE.add(self._cache_file)
- # start fetch task if its not already running
- if self._fetch_task is None:
- self._fetch_task = self.mass.create_task(self._create_cache_file())
- # wait until the first part of the file is received
- await self._first_part_received.wait()
- if self._stream_error:
- # an error occurred while creating the cache file
- # remove the cache file and raise an error
- raise AudioError(self._stream_error)
-
- def release(self) -> None:
- """Release the cache file."""
- self._subscribers -= 1
- if self._subscribers <= 0:
- assert self._cache_file is not None # for type checking
- CACHE_FILES_IN_USE.discard(self._cache_file)
-
- async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]:
- """
- Get the cached audio stream.
-
- Returns a string with the path of the cachefile if the file is ready.
- If the file is not yet ready, it will return an async generator that will
- stream the (intermediate) audio data from the cache file.
- """
- self._subscribers += 1
- assert self._cache_file is not None # type guard
- # mark file as in-use to prevent it being deleted
- CACHE_FILES_IN_USE.add(self._cache_file)
-
- async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
- chunksize = get_chunksize(self.streamdetails.audio_format, 1)
- wait_loops = 0
- assert self._cache_file is not None # type guard
- async with aiofiles.open(self._cache_file, "rb") as file:
- while wait_loops < 2000:
- chunk = await file.read(chunksize)
- if chunk:
- yield chunk
- await asyncio.sleep(0) # yield to eventloop
- del chunk
- elif self._all_data_written:
- # reached EOF
- break
- else:
- # data is not yet available, wait a bit
- await asyncio.sleep(0.05)
- # prevent an infinite loop in case of an error
- wait_loops += 1
-
- if self._all_data_written:
- # cache file is ready
- return self._cache_file
-
- # cache file does not exist at all (or is still being written)
- await self.create()
- return _stream_from_cache()
-
- async def _create_cache_file(self) -> None:
- time_start = time.time()
- self.logger.debug("Creating audio cache for %s", self.streamdetails.uri)
- assert self._cache_file is not None # for type checking
- CACHE_FILES_IN_USE.add(self._cache_file)
- self._first_part_received.clear()
- self._all_data_written = False
- extra_input_args = ["-y", *(self.org_extra_input_args or [])]
- audio_source: AsyncGenerator[bytes, None] | str | int
- if self.org_stream_type == StreamType.CUSTOM:
- provider = self.mass.get_provider(self.streamdetails.provider)
- if TYPE_CHECKING: # avoid circular import
- assert isinstance(provider, MusicProvider)
- audio_source = provider.get_audio_stream(
- self.streamdetails,
- )
- elif self.org_stream_type == StreamType.ICY:
- raise NotImplementedError("Caching of this streamtype is not supported!")
- elif self.org_stream_type == StreamType.HLS:
- if self.streamdetails.media_type == MediaType.RADIO:
- raise NotImplementedError("Caching of this streamtype is not supported!")
- assert self.org_path is not None # for type checking
- substream = await get_hls_substream(self.mass, self.org_path)
- audio_source = substream.path
- elif self.org_stream_type == StreamType.ENCRYPTED_HTTP:
- assert self.org_path is not None # for type checking
- assert self.streamdetails.decryption_key is not None # for type checking
- audio_source = self.org_path
- extra_input_args += ["-decryption_key", self.streamdetails.decryption_key]
- elif self.org_stream_type == StreamType.MULTI_FILE:
- audio_source = get_multi_file_stream(self.mass, self.streamdetails)
- else:
- assert self.org_path is not None # for type checking
- audio_source = self.org_path
-
- # we always use ffmpeg to fetch the original audio source
- # this may feel a bit redundant, but it's the most reliable way to fetch the audio
- # because ffmpeg has all logic to handle different audio formats, codecs, etc.
- # and it also accounts for complicated cases such as encrypted streams or
- # m4a/mp4 streams with the moov atom at the end of the file.
- # ffmpeg will produce a lossless copy of the original codec.
- ffmpeg_proc = FFMpeg(
- audio_input=audio_source,
- input_format=self.org_audio_format,
- output_format=self.streamdetails.audio_format,
- extra_input_args=extra_input_args,
- audio_output=self._cache_file,
- collect_log_history=True,
- )
- try:
- await ffmpeg_proc.start()
- # wait until the first data is written to the cache file
- while ffmpeg_proc.returncode is None:
- await asyncio.sleep(0.1)
- if not await asyncio.to_thread(os.path.exists, self._cache_file):
- continue
- if await asyncio.to_thread(os.path.getsize, self._cache_file) > 64000:
- break
-
- # set 'first part received' event to signal that the first part of the file is ready
- # this is useful for the get_audio_stream method to know when it can start streaming
- # we do guard for the returncode here, because if ffmpeg exited abnormally, we should
- # not signal that the first part is ready
- if ffmpeg_proc.returncode in (None, 0):
- self._first_part_received.set()
- self.logger.debug(
- "First part received for %s after %.2fs",
- self.streamdetails.uri,
- time.time() - time_start,
- )
- # wait until ffmpeg is done
- await ffmpeg_proc.wait()
-
- # raise an error if ffmpeg exited with a non-zero code
- if ffmpeg_proc.returncode != 0:
- ffmpeg_proc.logger.warning("\n".join(ffmpeg_proc.log_history))
- raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}")
-
- # set 'all data written' event to signal that the entire file is ready
- self._all_data_written = True
- self.logger.debug(
- "Writing all data for %s done in %.2fs",
- self.streamdetails.uri,
- time.time() - time_start,
- )
- except BaseException as err:
- self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err)
- # make sure that the (corrupted/incomplete) cache file is removed
- await self._remove_cache_file()
- # unblock the waiting tasks by setting the event
- # this will allow the tasks to continue and handle the error
- self._stream_error = str(err) or err.__qualname__ # type: ignore [attr-defined]
- self._first_part_received.set()
- finally:
- await ffmpeg_proc.close()
-
- async def _remove_cache_file(self) -> None:
- self._first_part_received.clear()
- self._all_data_written = False
- self._fetch_task = None
- assert self._cache_file is not None # for type checking
- await remove_file(self._cache_file)
+STREAMDETAILS_EXPIRATION: Final[int] = 60 * 15 # 15 minutes
async def crossfade_pcm_parts(
This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
Do not try to request streamdetails too much in advance as this is expiring data.
"""
- BYPASS_THROTTLER.set(True)
time_start = time.time()
LOGGER.debug("Getting streamdetails for %s", queue_item.uri)
if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
raise MediaNotFoundError(
f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
)
- if queue_item.streamdetails and (utc() - queue_item.streamdetails.created_at).seconds < 1800:
+ if (
+ queue_item.streamdetails
+ and (utc() - queue_item.streamdetails.created_at).seconds < STREAMDETAILS_EXPIRATION
+ ):
# already got a fresh/unused (or cached) streamdetails
- # we assume that the streamdetails are valid for max 30 minutes
+ # we assume that the streamdetails are valid for max STREAMDETAILS_EXPIRATION seconds
streamdetails = queue_item.streamdetails
else:
# retrieve streamdetails from provider
continue # provider not available ?
# get streamdetails from provider
try:
+ BYPASS_THROTTLER.set(True)
streamdetails = await music_prov.get_stream_details(
prov_media.item_id, media_item.media_type
)
LOGGER.warning(str(err))
else:
break
+ finally:
+ BYPASS_THROTTLER.set(False)
else:
msg = f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
raise MediaNotFoundError(msg)
streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
and streamdetails.media_type == MediaType.RADIO
):
- assert streamdetails.path is not None # for type checking
+ assert isinstance(streamdetails.path, str) # for type checking
resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
streamdetails.path = resolved_url
streamdetails.stream_type = stream_type
queue_item.uri,
int((time.time() - time_start) * 1000),
)
-
- # determine if we may use caching for the audio stream
- if streamdetails.enable_cache is None:
- streamdetails.enable_cache = await _is_cache_allowed(mass, streamdetails)
-
- # handle temporary cache support of audio stream
- if streamdetails.enable_cache:
- if streamdetails.cache is None:
- streamdetails.cache = StreamCache(mass, streamdetails)
- else:
- streamdetails.cache = cast("StreamCache", streamdetails.cache)
- # create cache (if needed) and wait until the cache is available
- await streamdetails.cache.create()
- LOGGER.debug(
- "streamdetails cache ready for %s in %s milliseconds",
- queue_item.uri,
- int((time.time() - time_start) * 1000),
- )
-
return streamdetails
-async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails) -> bool:
- """Check if caching is allowed for the given streamdetails."""
- if streamdetails.media_type not in (
- MediaType.TRACK,
- MediaType.AUDIOBOOK,
- MediaType.PODCAST_EPISODE,
- ):
- return False
- if streamdetails.stream_type in (StreamType.ICY, StreamType.LOCAL_FILE, StreamType.UNKNOWN):
- return False
- if streamdetails.stream_type == StreamType.LOCAL_FILE:
- # no need to cache local files
- return False
- allow_cache = mass.config.get_raw_core_config_value(
- "streams", CONF_ALLOW_AUDIO_CACHE, mass.streams.allow_cache_default
- )
- if allow_cache == "disabled":
- return False
- if not await has_enough_space(mass.streams.audio_cache_dir, 5):
- return False
- if allow_cache == "always":
- return True
- # auto mode
- if streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
- # always prefer cache for encrypted streams
- return True
- if not streamdetails.duration:
- # we can't determine filesize without duration so play it safe and dont allow cache
- return False
- estimated_filesize = get_chunksize(streamdetails.audio_format, streamdetails.duration)
- if streamdetails.stream_type == StreamType.MULTI_FILE:
- # prefer cache to speedup multi-file streams
- # (if total filesize smaller than 2GB)
- max_filesize = 2 * 1024 * 1024 * 1024
- elif streamdetails.stream_type == StreamType.CUSTOM:
- # prefer cache for custom streams (to speedup seeking)
- max_filesize = 250 * 1024 * 1024 # 250MB
- elif streamdetails.stream_type == StreamType.HLS:
- # prefer cache for HLS streams (to speedup seeking)
- max_filesize = 250 * 1024 * 1024 # 250MB
- elif streamdetails.media_type in (
- MediaType.AUDIOBOOK,
- MediaType.PODCAST_EPISODE,
- ):
- # prefer cache for audiobooks and episodes (to speedup seeking)
- max_filesize = 2 * 1024 * 1024 * 1024 # 2GB
- elif streamdetails.provider in SLOW_PROVIDERS:
- # prefer cache for slow providers
- max_filesize = 2 * 1024 * 1024 * 1024 # 2GB
- else:
- max_filesize = 50 * 1024 * 1024
-
- return estimated_filesize < max_filesize
-
-
async def get_media_stream(
mass: MusicAssistant,
streamdetails: StreamDetails,
- pcm_format: AudioFormat,
+ output_format: AudioFormat,
filter_params: list[str] | None = None,
) -> AsyncGenerator[bytes, None]:
- """Get PCM audio stream for given media details."""
+ """Get audio stream for given media details."""
logger = LOGGER.getChild("media_stream")
logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
extra_input_args = streamdetails.extra_input_args or []
- strip_silence_begin = streamdetails.strip_silence_begin
- strip_silence_end = streamdetails.strip_silence_end
if filter_params is None:
filter_params = []
if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
- strip_silence_begin = False
+ seek_position = streamdetails.seek_position
# work out audio source for these streamdetails
+ audio_source: str | AsyncGenerator[bytes, None]
stream_type = streamdetails.stream_type
- if stream_type == StreamType.CACHE:
- cache = cast("StreamCache", streamdetails.cache)
- audio_source = await cache.get_audio_stream()
- elif stream_type == StreamType.MULTI_FILE:
- audio_source = get_multi_file_stream(mass, streamdetails)
- elif stream_type == StreamType.CUSTOM:
+ if stream_type == StreamType.CUSTOM:
music_prov = mass.get_provider(streamdetails.provider)
if TYPE_CHECKING: # avoid circular import
assert isinstance(music_prov, MusicProvider)
audio_source = music_prov.get_audio_stream(
streamdetails,
- seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
+ seek_position=seek_position if streamdetails.can_seek else 0,
)
+ seek_position = 0 if streamdetails.can_seek else seek_position
elif stream_type == StreamType.ICY:
- assert streamdetails.path is not None # for type checking
+ assert isinstance(streamdetails.path, str) # for type checking
audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
+ seek_position = 0 # seeking not possible on radio streams
elif stream_type == StreamType.HLS:
- assert streamdetails.path is not None # for type checking
+ assert isinstance(streamdetails.path, str) # for type checking
substream = await get_hls_substream(mass, streamdetails.path)
audio_source = substream.path
if streamdetails.media_type == MediaType.RADIO:
# with ffmpeg, where they just stop after some minutes,
# so we tell ffmpeg to loop around in this case.
extra_input_args += ["-stream_loop", "-1", "-re"]
- elif stream_type == StreamType.ENCRYPTED_HTTP:
- assert streamdetails.path is not None # for type checking
- assert streamdetails.decryption_key is not None # for type checking
- audio_source = streamdetails.path
- extra_input_args += ["-decryption_key", streamdetails.decryption_key]
else:
- assert streamdetails.path is not None # for type checking
- audio_source = streamdetails.path
+ # all other stream types (HTTP, FILE, etc)
+ if stream_type == StreamType.ENCRYPTED_HTTP:
+ assert streamdetails.decryption_key is not None # for type checking
+ extra_input_args += ["-decryption_key", streamdetails.decryption_key]
+ if isinstance(streamdetails.path, list):
+ # multi part stream
+ audio_source = get_multi_file_stream(mass, streamdetails, seek_position)
+ seek_position = 0 # handled by get_multi_file_stream
+ else:
+ # regular single file/url stream
+ assert isinstance(streamdetails.path, str) # for type checking
+ audio_source = streamdetails.path
# handle seek support
- if (
- streamdetails.seek_position
- and streamdetails.duration
- and streamdetails.allow_seek
- # allow seeking for custom streams,
- # but only for custom streams that can't seek theirselves
- and not (stream_type == StreamType.CUSTOM and streamdetails.can_seek)
- ):
- extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
+ if seek_position and streamdetails.duration and streamdetails.allow_seek:
+ extra_input_args += ["-ss", str(int(seek_position))]
bytes_sent = 0
- chunk_number = 0
- buffer: bytes = b""
finished = False
cancelled = False
+ first_chunk_received = False
ffmpeg_proc = FFMpeg(
audio_input=audio_source,
input_format=streamdetails.audio_format,
- output_format=pcm_format,
+ output_format=output_format,
filter_params=filter_params,
extra_input_args=extra_input_args,
collect_log_history=True,
streamdetails.uri,
streamdetails.stream_type,
streamdetails.volume_normalization_mode,
- pcm_format.content_type.value,
+ output_format.content_type.value,
ffmpeg_proc.proc.pid,
)
- # use 1 second chunks
- chunk_size = pcm_format.pcm_sample_size
+ stream_start = mass.loop.time()
+
+ chunk_size = get_chunksize(output_format, 1)
async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
- if chunk_number == 1:
+ if not first_chunk_received:
# At this point ffmpeg has started and should now know the codec used
# for encoding the audio.
+ first_chunk_received = True
streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
-
- # for non-tracks we just yield all chunks directly
- if streamdetails.media_type != MediaType.TRACK:
- yield chunk
- bytes_sent += len(chunk)
- continue
-
- chunk_number += 1
- # determine buffer size dynamically
- if chunk_number < 5 and strip_silence_begin:
- req_buffer_size = int(pcm_format.pcm_sample_size * 5)
- elif chunk_number > 240 and strip_silence_end:
- req_buffer_size = int(pcm_format.pcm_sample_size * 10)
- elif chunk_number > 120 and strip_silence_end:
- req_buffer_size = int(pcm_format.pcm_sample_size * 8)
- elif chunk_number > 60:
- req_buffer_size = int(pcm_format.pcm_sample_size * 6)
- elif chunk_number > 20 and strip_silence_end:
- req_buffer_size = int(pcm_format.pcm_sample_size * 4)
- else:
- req_buffer_size = pcm_format.pcm_sample_size * 2
-
- # always append to buffer
- buffer += chunk
- del chunk
-
- if len(buffer) < req_buffer_size:
- # buffer is not full enough, move on
- continue
-
- if strip_silence_begin:
- # strip silence from begin of audio
- strip_silence_begin = False
- chunk = await strip_silence( # noqa: PLW2901
- mass, buffer, pcm_format=pcm_format
+ logger.debug(
+ "First chunk received after %s seconds",
+ mass.loop.time() - stream_start,
)
- bytes_sent += len(chunk)
- yield chunk
- buffer = b""
- continue
-
- #### OTHER: enough data in buffer, feed to output
- while len(buffer) > req_buffer_size:
- yield buffer[: pcm_format.pcm_sample_size]
- bytes_sent += pcm_format.pcm_sample_size
- buffer = buffer[pcm_format.pcm_sample_size :]
+ yield chunk
+ bytes_sent += len(chunk)
# end of audio/track reached
logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
- if strip_silence_end and buffer:
- # strip silence from end of audio
- buffer = await strip_silence(
- mass,
- buffer,
- pcm_format=pcm_format,
- reverse=True,
- )
- # send remaining bytes in buffer
- bytes_sent += len(buffer)
- yield buffer
- del buffer
# wait until stderr also completed reading
await ffmpeg_proc.wait_with_timeout(5)
if bytes_sent == 0:
# always ensure close is called which also handles all cleanup
await ffmpeg_proc.close()
# try to determine how many seconds we've streamed
- seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+ if output_format.content_type.is_pcm():
+ # for pcm output we can calculate this easily
+ seconds_streamed = bytes_sent / output_format.pcm_sample_size if bytes_sent else 0
+ streamdetails.seconds_streamed = seconds_streamed
+ # store accurate duration
+ if finished and not streamdetails.seek_position and seconds_streamed:
+ streamdetails.duration = int(seconds_streamed)
+ else:
+ # this is a less accurate estimate for compressed audio
+ seconds_streamed = bytes_sent / get_chunksize(output_format, 1)
logger.debug(
"stream %s (with code %s) for %s - seconds streamed: %s",
"cancelled" if cancelled else "finished" if finished else "aborted",
streamdetails.uri,
seconds_streamed,
)
- streamdetails.seconds_streamed = seconds_streamed
- # store accurate duration
- if finished and not streamdetails.seek_position and seconds_streamed:
- streamdetails.duration = int(seconds_streamed)
-
- # release cache if needed
- if cache := streamdetails.cache:
- cache = cast("StreamCache", streamdetails.cache)
- cache.release()
# parse loudnorm data if we have that collected (and enabled)
if (
media_type=streamdetails.media_type,
)
)
- elif (
+ # schedule loudness analysis if needed
+ if (
streamdetails.loudness is None
and streamdetails.volume_normalization_mode
not in (
VolumeNormalizationMode.DISABLED,
VolumeNormalizationMode.FIXED_GAIN,
)
- and (finished or (seconds_streamed >= 30))
+ and (finished or (seconds_streamed >= 300))
):
# dynamic mode not allowed and no measurement known, we need to analyze the audio
# add background task to start analyzing the audio
if cache := await mass.cache.get(
key=url, provider=CACHE_PROVIDER, category=CACHE_CATEGORY_RESOLVED_RADIO_URL
):
- return cast("tuple[str, StreamType]", cache)
+ if TYPE_CHECKING: # for type checking
+ cache = cast("tuple[str, str]", cache)
+ return (cache[0], StreamType(cache[1]))
stream_type = StreamType.HTTP
resolved_url = url
timeout = ClientTimeout(total=0, connect=10, sock_read=5)
Arguments:
seek_position: The position to seek to in seconds
"""
- files_list: list[str] = streamdetails.data
+ files_list: list[str] = []
+ if not isinstance(streamdetails.path, list):
+ raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
+ skipped_duration = 0.0
+ for part in streamdetails.path:
+ if not isinstance(part, MultiPartPath):
+ raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
+ if seek_position and part.duration and (skipped_duration + part.duration) < seek_position:
+ skipped_duration += part.duration
+ continue
+ files_list.append(part.path)
+ if seek_position:
+ seek_position -= int(skipped_duration)
+
# concat input files
temp_file = f"/tmp/{shortuuid.random(20)}.txt" # noqa: S108
async with aiofiles.open(temp_file, "w") as f:
if TYPE_CHECKING: # avoid circular import
assert isinstance(music_prov, MusicProvider)
streamdetails = await music_prov.get_stream_details(item_id, media_type)
-
- audio_input: AsyncGenerator[bytes, None] | str
- if streamdetails.stream_type == StreamType.CUSTOM:
- audio_input = music_prov.get_audio_stream(streamdetails, 30)
- else:
- assert streamdetails.path is not None # for type checking
- audio_input = streamdetails.path
- async for chunk in get_ffmpeg_stream(
- audio_input=audio_input,
- input_format=streamdetails.audio_format,
+ streamdetails.extra_input_args += ["-t", "30"] # cut after 30 seconds
+ async for chunk in get_media_stream(
+ mass=mass,
+ streamdetails=streamdetails,
output_format=AudioFormat(content_type=ContentType.AAC),
- extra_input_args=["-to", "30"],
):
yield chunk
def get_chunksize(
fmt: AudioFormat,
- seconds: int = 1,
+ seconds: float = 1,
) -> int:
"""Get a default chunk/file size for given contenttype in bytes."""
pcm_size = int(fmt.sample_rate * (fmt.bit_depth / 8) * fmt.channels * seconds)
"-t",
"600",
]
- if streamdetails.stream_type == StreamType.CACHE:
- cache = cast("StreamCache", streamdetails.cache)
- audio_source = await cache.get_audio_stream()
- elif streamdetails.stream_type == StreamType.MULTI_FILE:
- audio_source = get_multi_file_stream(mass, streamdetails)
- elif streamdetails.stream_type == StreamType.CUSTOM:
+ # work out audio source for these streamdetails
+ stream_type = streamdetails.stream_type
+ audio_source: str | AsyncGenerator[bytes, None]
+ if stream_type == StreamType.CUSTOM:
music_prov = mass.get_provider(streamdetails.provider)
if TYPE_CHECKING: # avoid circular import
assert isinstance(music_prov, MusicProvider)
- audio_source = music_prov.get_audio_stream(
- streamdetails,
- )
- elif streamdetails.stream_type == StreamType.HLS:
- assert streamdetails.path is not None # for type checking
+ audio_source = music_prov.get_audio_stream(streamdetails)
+ elif stream_type == StreamType.ICY:
+ assert isinstance(streamdetails.path, str) # for type checking
+ audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
+ elif stream_type == StreamType.HLS:
+ assert isinstance(streamdetails.path, str) # for type checking
substream = await get_hls_substream(mass, streamdetails.path)
audio_source = substream.path
- elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
- assert streamdetails.path is not None # for type checking
- assert streamdetails.decryption_key is not None # for type checking
- audio_source = streamdetails.path
- extra_input_args += ["-decryption_key", streamdetails.decryption_key]
else:
- assert streamdetails.path is not None # for type checking
- audio_source = streamdetails.path
+ # all other stream types (HTTP, FILE, etc)
+ if stream_type == StreamType.ENCRYPTED_HTTP:
+ assert streamdetails.decryption_key is not None # for type checking
+ extra_input_args += ["-decryption_key", streamdetails.decryption_key]
+ if isinstance(streamdetails.path, list):
+ # multi part stream - just use a single file for the measurement
+ audio_source = streamdetails.path[1].path
+ else:
+ # regular single file/url stream
+ assert isinstance(streamdetails.path, str) # for type checking
+ audio_source = streamdetails.path
# calculate BS.1770 R128 integrated loudness with ffmpeg
async with FFMpeg(
streamdetails.uri,
loudness,
)
- finally:
- # release cache if needed
- if cache := streamdetails.cache:
- cache = cast("StreamCache", streamdetails.cache)
- cache.release()
def _get_normalization_mode(