streamdetails.seek_position = 0
# collect all arguments for ffmpeg
filter_params = []
+ extra_input_args = []
if streamdetails.target_loudness is not None:
# add loudnorm filters
filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11"
streamdetails,
seek_position=streamdetails.seek_position,
)
- elif streamdetails.stream_type == StreamType.HLS:
- audio_source = get_hls_stream(self.mass, streamdetails.path, streamdetails)
elif streamdetails.stream_type == StreamType.ICY:
audio_source = get_icy_stream(self.mass, streamdetails.path, streamdetails)
+ elif streamdetails.stream_type == StreamType.HLS:
+ audio_source = get_hls_stream(
+ self.mass, streamdetails.path, streamdetails, streamdetails.seek_position
+ )
else:
audio_source = streamdetails.path
- extra_input_args = []
- if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM:
- extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
+ if streamdetails.seek_position:
+ extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
logger.debug("start media stream for: %s", streamdetails.uri)
bytes_sent = 0
finished = False
- async with FFMpeg(
- audio_input=audio_source,
- input_format=streamdetails.audio_format,
- output_format=pcm_format,
- filter_params=filter_params,
- extra_input_args=[
- *extra_input_args,
- # we criple ffmpeg a bit on purpose with the filter_threads
- # option so it doesn't consume all cpu when calculating loudnorm
- "-filter_threads",
- "2",
- ],
- collect_log_history=True,
- logger=logger,
- ) as ffmpeg_proc:
- try:
+ try:
+ async with FFMpeg(
+ audio_input=audio_source,
+ input_format=streamdetails.audio_format,
+ output_format=pcm_format,
+ filter_params=filter_params,
+ extra_input_args=[
+ *extra_input_args,
+ # we criple ffmpeg a bit on purpose with the filter_threads
+ # option so it doesn't consume all cpu when calculating loudnorm
+ "-filter_threads",
+ "2",
+ ],
+ collect_log_history=True,
+ logger=logger,
+ ) as ffmpeg_proc:
async for chunk in ffmpeg_proc.iter_any(pcm_format.pcm_sample_size):
bytes_sent += len(chunk)
yield chunk
del chunk
finished = True
- finally:
- if finished:
- await ffmpeg_proc.wait()
- else:
- await ffmpeg_proc.close()
-
- # try to determine how many seconds we've streamed
- seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
- logger.debug(
- "stream %s (with code %s) for %s - seconds streamed: %s",
- "finished" if finished else "aborted",
- ffmpeg_proc.returncode,
- streamdetails.uri,
- seconds_streamed,
- )
- if seconds_streamed:
- streamdetails.seconds_streamed = seconds_streamed
- # store accurate duration
- if finished and not streamdetails.seek_position and seconds_streamed:
- streamdetails.duration = seconds_streamed
-
- # parse loudnorm data if we have that collected
- if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
- required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
- if finished or (seconds_streamed >= required_seconds):
- logger.debug(
- "Loudness measurement for %s: %s",
- streamdetails.uri,
- loudness_details,
- )
- streamdetails.loudness = loudness_details
- self.mass.create_task(
- self.mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness_details
- )
- )
- # report playback
- if finished or seconds_streamed > 30:
+ finally:
+ if finished and not ffmpeg_proc.closed:
+ await asyncio.wait_for(ffmpeg_proc.wait(), 60)
+ elif not ffmpeg_proc.closed:
+ await ffmpeg_proc.close()
+
+ # try to determine how many seconds we've streamed
+ seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+ logger.debug(
+ "stream %s (with code %s) for %s - seconds streamed: %s",
+ "finished" if finished else "aborted",
+ ffmpeg_proc.returncode,
+ streamdetails.uri,
+ seconds_streamed,
+ )
+ streamdetails.seconds_streamed = seconds_streamed
+ # store accurate duration
+ if finished and not streamdetails.seek_position and seconds_streamed:
+ streamdetails.duration = seconds_streamed
+
+ # parse loudnorm data if we have that collected
+ if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
+ required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+ if finished or (seconds_streamed >= required_seconds):
+ logger.debug(
+ "Loudness measurement for %s: %s",
+ streamdetails.uri,
+ loudness_details,
+ )
+ streamdetails.loudness = loudness_details
self.mass.create_task(
- self.mass.music.mark_item_played(
- streamdetails.media_type,
- streamdetails.item_id,
- streamdetails.provider,
+ self.mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness_details
)
)
- if music_prov := self.mass.get_provider(streamdetails.provider):
- self.mass.create_task(
- music_prov.on_streamed(streamdetails, seconds_streamed)
- )
+ # report playback
+ if finished or seconds_streamed > 30:
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ streamdetails.media_type,
+ streamdetails.item_id,
+ streamdetails.provider,
+ )
+ )
+ if music_prov := self.mass.get_provider(streamdetails.provider):
+ self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
def _log_request(self, request: web.Request) -> None:
"""Log request."""
import os
import re
import struct
+import time
from collections import deque
from collections.abc import AsyncGenerator
from contextlib import suppress
"""Close/terminate the process and wait for exit."""
if self._stdin_task and not self._stdin_task.done():
self._stdin_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._stdin_task
# make sure the stdin generator is also properly closed
# by propagating a cancellederror within
with suppress(RuntimeError):
async def _log_reader_task(self) -> None:
"""Read ffmpeg log from stderr."""
+ decode_errors = 0
async for line in self.iter_stderr():
if self.collect_log_history:
self.log_history.append(line)
if "error" in line or "warning" in line:
- self.logger.warning(line)
+ self.logger.debug(line)
elif "critical" in line:
- self.logger.critical(line)
+ self.logger.warning(line)
else:
self.logger.log(VERBOSE_LOG_LEVEL, line)
+ if "Invalid data found when processing input" in line:
+ decode_errors += 1
+ if decode_errors >= 50:
+ self.logger.error(line)
+ await super().close(True)
+
# if streamdetails contenttype is unknown, try parse it from the ffmpeg log
- if line.startswith("Stream #0:0: Audio: "):
+ if line.startswith("Stream #") and ": Audio: " in line:
if self.input_format.content_type == ContentType.UNKNOWN:
- content_type_raw = line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
+ content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
content_type = ContentType.try_parse(content_type_raw)
self.logger.info(
"Detected (input) content type: %s (%s)", content_type, content_type_raw
"""Feed stdin with audio chunks from an AsyncGenerator."""
if TYPE_CHECKING:
self.audio_input: AsyncGenerator[bytes, None]
- async for chunk in self.audio_input:
- await self.write(chunk)
- # write EOF once we've reached the end of the input stream
- await self.write_eof()
+ try:
+ async for chunk in self.audio_input:
+ await self.write(chunk)
+ # write EOF once we've reached the end of the input stream
+ await self.write_eof()
+ except Exception as err:
+ # make sure we dont swallow any exceptions and we bail out
+ # once our audio source fails.
+ if isinstance(err, asyncio.CancelledError):
+ self.logger.exception(err)
+ await self.close(True)
async def crossfade_pcm_parts(
Do not try to request streamdetails in advance as this is expiring data.
param media_item: The QueueItem for which to request the streamdetails for.
"""
+ if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
+ LOGGER.warning("seeking is not possible on duration-less streams!")
+ seek_position = 0
if queue_item.streamdetails and seek_position:
LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}")
# we already have (fresh?) streamdetails stored on the queueitem, use these.
streamdetails.stream_type = StreamType.HLS
elif is_icy:
streamdetails.stream_type = StreamType.ICY
-
# set queue_id on the streamdetails so we know what is being streamed
streamdetails.queue_id = queue_item.queue_id
# handle skip/fade_in details
async def get_hls_stream(
- mass: MusicAssistant, url: str, streamdetails: StreamDetails
+ mass: MusicAssistant,
+ url: str,
+ streamdetails: StreamDetails,
+ seek_position: int = 0,
) -> AsyncGenerator[bytes, None]:
"""Get audio stream from HTTP HLS stream."""
logger = LOGGER.getChild("hls_stream")
+ logger.debug("Start streaming HLS stream for url %s", url)
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
- # fetch master playlist and select (best) child playlist
- # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
- async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp:
- charset = resp.charset or "utf-8"
- master_m3u_data = await resp.text(charset)
- substreams = parse_m3u(master_m3u_data)
- if any(x for x in substreams if x.path.endswith(".ts")) or all(
- x for x in substreams if (x.stream_info or x.length)
- ):
- # the url we got is already a substream
- substream_url = url
- else:
- # sort substreams on best quality (highest bandwidth)
- substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
- substream = substreams[0]
- substream_url = substream.path
- if not substream_url.startswith("http"):
- # path is relative, stitch it together
- base_path = url.rsplit("/", 1)[0]
- substream_url = base_path + "/" + substream.path
-
- logger.debug(
- "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
- )
-
- if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
- streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
-
- prev_chunks: deque[str] = deque(maxlen=30)
+ prev_chunks: deque[str] = deque(maxlen=50)
has_playlist_metadata: bool | None = None
has_id3_metadata: bool | None = None
+ is_live_stream = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
+ # we simply select the best quality substream here
+ # if we ever want to support adaptive stream selection based on bandwidth
+ # we need to move the substream selection into the loop below and make it
+ # bandwidth aware. For now we just assume domestic high bandwidth where
+ # the user wants the best quality possible at all times.
+ substream_url = await get_hls_substream(mass, url)
+ seconds_skipped = 0
+ empty_loops = 0
while True:
+ logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
async with mass.http_session.get(
substream_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
+ resp.raise_for_status()
charset = resp.charset or "utf-8"
substream_m3u_data = await resp.text(charset)
# get chunk-parts from the substream
hls_chunks = parse_m3u(substream_m3u_data)
+ chunk_seconds = 0
+ time_start = time.time()
for chunk_item in hls_chunks:
if chunk_item.path in prev_chunks:
continue
+ chunk_length = int(chunk_item.length) if chunk_item.length else 6
+ # try to support seeking here
+ if seek_position and (seconds_skipped + chunk_length) < seek_position:
+ seconds_skipped += chunk_length
+ continue
chunk_item_url = chunk_item.path
if not chunk_item_url.startswith("http"):
# path is relative, stitch it together
async with mass.http_session.get(
chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
- async for chunk in resp.content.iter_any():
- yield chunk
+ yield await resp.content.read()
+ chunk_seconds += chunk_length
# handle (optional) in-band (m3u) metadata
if has_id3_metadata is not None and has_playlist_metadata:
continue
tags = await parse_tags(chunk_item_url)
has_id3_metadata = tags.title and tags.title not in chunk_item.path
logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
+ # end of stream reached - for non livestreams, we are ready and should return
+ # for livestreams we loop around to get the next playlist with chunks
+ if not is_live_stream:
+ return
+ # safeguard for an endless loop
+ # this may happen if we're simply going too fast for the live stream
+ # we already throttle it a bit but we may end up in a situation where something is wrong
+ # and we want to break out of this loop, hence this check
+ if chunk_seconds == 0:
+ empty_loops += 1
+ await asyncio.sleep(1)
+ else:
+ empty_loops = 0
+ if empty_loops == 50:
+ logger.warning("breaking out of endless loop")
+ break
+ # ensure that we're not going to fast - otherwise we get the same substream playlist
+ while (time.time() - time_start) < (chunk_seconds - 1):
+ await asyncio.sleep(0.5)
+
+
+async def get_hls_substream(
+ mass: MusicAssistant,
+ url: str,
+) -> str:
+ """Select the (highest quality) HLS substream for given HLS playlist/URL."""
+ timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+ # fetch master playlist and select (best) child playlist
+ # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
+ async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp:
+ resp.raise_for_status()
+ charset = resp.charset or "utf-8"
+ master_m3u_data = await resp.text(charset)
+ substreams = parse_m3u(master_m3u_data)
+ if any(x for x in substreams if x.length):
+ # the url we got is already a substream
+ return url
+ # sort substreams on best quality (highest bandwidth)
+ substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
+ substream = substreams[0]
+ substream_url = substream.path
+ if not substream_url.startswith("http"):
+ # path is relative, stitch it together
+ base_path = url.rsplit("/", 1)[0]
+ substream_url = base_path + "/" + substream.path
+ return substream_url
async def get_http_stream(
"-nostats",
"-ignore_unknown",
"-protocol_whitelist",
- "file,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
+ "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
]
# collect input args
input_args = []
"-i",
input_path,
]
- elif input_format.content_type in (
- ContentType.UNKNOWN,
- ContentType.M4A,
- ContentType.M4B,
- ContentType.MP4,
- ):
- # let ffmpeg guess/auto detect the content type
- input_args += ["-i", input_path]
else:
- # use explicit format identifier for all other
- input_args += ["-f", input_format.content_type.value, "-i", input_path]
+ # let ffmpeg auto detect the content type from the metadata/headers
+ input_args += ["-i", input_path]
# collect output args
output_args = []
# devnull stream
output_args = ["-f", "null", "-"]
elif output_format.content_type == ContentType.UNKNOWN:
- # use wav so we at least have some headers for the rest of the chain
- output_args = ["-f", "wav", output_path]
+ raise RuntimeError("Invalid output format specified")
else:
if output_format.content_type.is_pcm():
output_args += ["-acodec", output_format.content_type.name.lower()]
output_path,
]
+ # edge case: source file is not stereo - downmix to stereo
+ if input_format.channels > 2 and output_format.channels == 2:
+ filter_params = [
+ "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE",
+ *filter_params,
+ ]
+
# determine if we need to do resampling
if (
input_format.sample_rate != output_format.sample_rate
or input_format.bit_depth != output_format.bit_depth
):
# prefer resampling with libsoxr due to its high quality
- resample_filter = f'aresample=resampler={"soxr" if libsoxr_support else "swr"}'
+ if libsoxr_support:
+ resample_filter = "aresample=resampler=soxr:precision=28"
+ else:
+ resample_filter = "aresample=resampler=swr"
if output_format.bit_depth < input_format.bit_depth:
# apply dithering when going down to 16 bits
resample_filter += ":osf=s16:dither_method=triangular_hp"
- if not output_format.content_type.is_pcm():
- # specify sample rate if output format is not pcm
+ if input_format.sample_rate != output_format.sample_rate:
resample_filter += f":osr={output_format.sample_rate}"
filter_params.append(resample_filter)