queue.queue_id, CONF_CROSSFADE_DURATION, 8
)
crossfade_size = int(pcm_sample_size * crossfade_duration)
- buffer_size = int(pcm_sample_size * 2) # 2 seconds
- if use_crossfade:
- # buffer size needs to be big enough to include the crossfade part
- buffer_size += crossfade_size
bytes_written = 0
buffer = b""
# handle incoming audio chunks
queue_track.streamdetails,
pcm_format=pcm_format,
# strip silence from begin/end if track is being crossfaded
- strip_silence_begin=use_crossfade,
+ strip_silence_begin=use_crossfade and bytes_written > 0,
strip_silence_end=use_crossfade,
):
+ # required buffer size is a bit dynamic,
+ # it needs to be small when the flow stream starts
+ seconds_streamed = int(bytes_written / pcm_sample_size)
+ if not use_crossfade or seconds_streamed < 5:
+ buffer_size = pcm_sample_size
+ elif seconds_streamed < 10:
+ buffer_size = pcm_sample_size * 2
+ elif use_crossfade and seconds_streamed < 20:
+ buffer_size = pcm_sample_size * 5
+ else:
+ buffer_size = crossfade_size + pcm_sample_size * 2
+ # buffer size needs to be big enough to include the crossfade part
+
# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
del chunk
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio or streamdetails.seek_position:
strip_silence_begin = False
- # pcm_sample_size = chunk size = 1 second of pcm audio
- chunk_size = pcm_format.pcm_sample_size
- expected_chunks = int(
- ((streamdetails.duration or 0) * pcm_format.pcm_sample_size) / chunk_size
- )
- if expected_chunks < 10:
+ if is_radio or streamdetails.duration < 30:
strip_silence_end = False
+ # pcm_sample_size = chunk size = 1 second of pcm audio
+ pcm_sample_size = pcm_format.pcm_sample_size
+ buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size
+ buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size
# collect all arguments for ffmpeg
filter_params = []
stderr_data = ""
async for line in ffmpeg_proc.iter_stderr():
line = line.decode().strip() # noqa: PLW2901
+ # if streamdetails contenttype is uinknown, try pars eit from the ffmpeg log output
+ # this has no actual usecase, other than displaying the correct codec in the UI
+ if (
+ streamdetails.audio_format.content_type == ContentType.UNKNOWN
+ and line.startswith("Stream #0:0: Audio: ")
+ ):
+ streamdetails.audio_format.content_type = ContentType.try_parse(
+ line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
+ )
if stderr_data or "loudnorm" in line:
stderr_data += line
+ elif "HTTP error" in line:
+ logger.warning(line)
elif line:
- self.logger.log(VERBOSE_LOG_LEVEL, line)
+ logger.log(VERBOSE_LOG_LEVEL, line)
del line
# if we reach this point, the process is finished (finish or aborted)
self.mass.create_task(log_reader(ffmpeg_proc, state_data))
# get pcm chunks from stdout
- # we always stay one chunk behind to properly detect end of chunks
+ # we always stay buffer_size of bytes behind
# so we can strip silence at the beginning and end of a track
- prev_chunk = b""
+ buffer = b""
chunk_num = 0
- async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+ async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size):
chunk_num += 1
+ required_buffer = buffer_size_begin if chunk_num < 10 else buffer_size_end
+ buffer += chunk
+ del chunk
+
+ if len(buffer) < required_buffer:
+ # buffer is not full enough, move on
+ continue
+
if strip_silence_begin and chunk_num == 2:
# first 2 chunks received, strip silence of beginning
stripped_audio = await strip_silence(
self.mass,
- prev_chunk + chunk,
+ buffer,
sample_rate=pcm_format.sample_rate,
bit_depth=pcm_format.bit_depth,
)
yield stripped_audio
state_data["bytes_sent"] += len(stripped_audio)
- prev_chunk = b""
+ buffer = b""
del stripped_audio
continue
- if strip_silence_end and chunk_num >= (expected_chunks - 6):
- # last part of the track, collect multiple chunks to strip silence later
- prev_chunk += chunk
- continue
-
- # middle part of the track, send previous chunk and collect current chunk
- if prev_chunk:
- yield prev_chunk
- state_data["bytes_sent"] += len(prev_chunk)
- # collect this chunk for next round
- prev_chunk = chunk
+ #### OTHER: enough data in buffer, feed to output
+ while len(buffer) > required_buffer:
+ subchunk = buffer[:pcm_sample_size]
+ buffer = buffer[pcm_sample_size:]
+ state_data["bytes_sent"] += len(subchunk)
+ yield subchunk
+ del subchunk
# if we did not receive any data, something went (terribly) wrong
# raise here to prevent an (endless) loop elsewhere
raise AudioError(f"stream error on {streamdetails.uri}")
# all chunks received, strip silence of last part if needed and yield remaining bytes
- if strip_silence_end and prev_chunk:
+ if strip_silence_end:
final_chunk = await strip_silence(
self.mass,
- prev_chunk,
+ buffer,
sample_rate=pcm_format.sample_rate,
bit_depth=pcm_format.bit_depth,
reverse=True,
)
else:
- final_chunk = prev_chunk
+ final_chunk = buffer
# yield final chunk to output (as one big chunk)
yield final_chunk
state_data["bytes_sent"] += len(final_chunk)
state_data["finished"].set()
del final_chunk
- del prev_chunk
+ del buffer
def _log_request(self, request: web.Request) -> None:
"""Log request."""
import os
import re
import struct
+from collections import deque
from io import BytesIO
from time import time
from typing import TYPE_CHECKING
fetch_playlist,
parse_m3u,
)
+from music_assistant.server.helpers.tags import parse_tags
from .process import AsyncProcess, check_output
from .util import create_tempfile
Do not try to request streamdetails in advance as this is expiring data.
param media_item: The QueueItem for which to request the streamdetails for.
"""
- if queue_item.streamdetails and (time() < queue_item.streamdetails.expires):
+ if queue_item.streamdetails and (time() + 60) < queue_item.streamdetails.expires:
LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}")
# we already have (fresh) streamdetails stored on the queueitem, use these.
# this happens for example while seeking in a track.
item_key = f"{music_prov.lookup_key}/{prov_media.item_id}"
cache_key = f"cached_streamdetails_{item_key}"
if cache := await mass.cache.get(cache_key):
- LOGGER.debug(f"Using cached streamdetails for {item_key}")
- streamdetails = StreamDetails.from_dict(cache)
- break
+ if time() + 60 < cache["expires"]:
+ LOGGER.debug(f"Using cached streamdetails for {item_key}")
+ streamdetails = StreamDetails.from_dict(cache)
# get streamdetails from provider
try:
streamdetails: StreamDetails = await music_prov.get_stream_details(
prov_media.item_id
)
- # store streamdetails in cache
- expiration = streamdetails.expires - time()
- if expiration > 300:
- await mass.cache.set(
- cache_key, streamdetails.to_dict(), expiration=expiration - 60
- )
except MusicAssistantError as err:
LOGGER.warning(str(err))
else:
+ # store streamdetails in cache
+ expiration = streamdetails.expires - time()
+ await mass.cache.set(cache_key, streamdetails.to_dict(), expiration=expiration - 60)
break
else:
raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
return file.getvalue()
-async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]:
+async def resolve_radio_stream(
+ mass: MusicAssistant, url: str, use_get: bool = False
+) -> tuple[str, bool, bool]:
"""
Resolve a streaming radio URL.
- bool uf the URL represents a HLS stream/playlist.
"""
base_url = url.split("?")[0]
- cache_key = f"resolved_radio_{url}"
+ cache_key = f"RADIO_RESOLVED_{url}"
if cache := await mass.cache.get(cache_key):
return cache
is_hls = False
resolved_url = url
timeout = ClientTimeout(total=0, connect=10, sock_read=5)
try:
- async with mass.http_session.head(
- url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout
+ method = "GET" if use_get else "HEAD"
+ async with mass.http_session.request(
+ method, url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout
) as resp:
resolved_url = str(resp.real_url)
headers = resp.headers
- supports_icy = int(headers.get("icy-metaint", "0")) > 0
+ resp.raise_for_status()
+ if not resp.headers:
+ raise InvalidDataError("no headers found")
+ supports_icy = headers.get("icy-name") is not None or "Icecast" in headers.get("server", "")
is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
if (
base_url.endswith((".m3u", ".m3u8", ".pls"))
is_hls = True
except (ClientResponseError, InvalidDataError) as err:
+ if not use_get:
+ return await resolve_radio_stream(mass, resolved_url, True)
LOGGER.warning("Error while parsing radio URL %s: %s", url, err)
return (resolved_url, supports_icy, is_hls)
result = (resolved_url, supports_icy, is_hls)
- await mass.cache.set(cache_key, result, expiration=86400)
+ cache_expiration = 24 * 3600 if url == resolved_url else 600
+ await mass.cache.set(cache_key, result, expiration=cache_expiration)
return result
async def get_hls_stream(
mass: MusicAssistant, url: str, streamdetails: StreamDetails
+) -> AsyncGenerator[bytes, None]:
+ """Get audio stream from HTTP HLS stream."""
+ logger = LOGGER.getChild("hls_stream")
+ timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+ # fetch master playlist and select (best) child playlist
+ # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
+ async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
+ charset = resp.charset or "utf-8"
+ master_m3u_data = await resp.text(charset)
+ substreams = parse_m3u(master_m3u_data)
+ if any(x for x in substreams if x.path.endswith(".ts")) or not all(
+ x for x in substreams if x.stream_info is not None
+ ):
+ # the url we got is already a substream
+ substream_url = url
+ else:
+ # sort substreams on best quality (highest bandwidth)
+ substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
+ substream = substreams[0]
+ substream_url = substream.path
+ if not substream_url.startswith("http"):
+ # path is relative, stitch it together
+ base_path = url.rsplit("/", 1)[0]
+ substream_url = base_path + "/" + substream.path
+
+ logger.debug(
+ "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
+ )
+
+ if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
+ streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
+
+ prev_chunks: deque[str] = deque(maxlen=30)
+ has_playlist_metadata: bool | None = None
+ has_id3_metadata: bool | None = None
+ while True:
+ async with mass.http_session.get(
+ substream_url, headers=VLC_HEADERS, timeout=timeout
+ ) as resp:
+ charset = resp.charset or "utf-8"
+ substream_m3u_data = await resp.text(charset)
+ # get chunk-parts from the substream
+ hls_chunks = parse_m3u(substream_m3u_data)
+ for chunk_item in hls_chunks:
+ if chunk_item.path in prev_chunks:
+ continue
+ chunk_item_url = chunk_item.path
+ if not chunk_item_url.startswith("http"):
+ # path is relative, stitch it together
+ base_path = substream_url.rsplit("/", 1)[0]
+ chunk_item_url = base_path + "/" + chunk_item.path
+ # handle (optional) in-playlist (timed) metadata
+ if has_playlist_metadata is None:
+ has_playlist_metadata = chunk_item.title is not None
+ logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
+ if has_playlist_metadata and chunk_item.title != "no desc":
+ # bbc (and maybe others?) set the title to 'no desc'
+ streamdetails.stream_title = chunk_item.title
+ logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item)
+ # prevent that we play this chunk again if we loop through
+ prev_chunks.append(chunk_item.path)
+ async with mass.http_session.get(
+ chunk_item_url, headers=VLC_HEADERS, timeout=timeout
+ ) as resp:
+ async for chunk in resp.content.iter_any():
+ yield chunk
+ # handle (optional) in-band (m3u) metadata
+ if has_id3_metadata is not None and has_playlist_metadata:
+ continue
+ if has_id3_metadata in (None, True):
+ tags = await parse_tags(chunk_item_url)
+ has_id3_metadata = tags.title and tags.title not in chunk_item.path
+ logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
+
+
+async def get_hls_stream_org(
+ mass: MusicAssistant, url: str, streamdetails: StreamDetails
) -> AsyncGenerator[bytes, None]:
"""Get audio stream from HTTP HLS stream."""
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
"-ignore_unknown",
"-protocol_whitelist",
"file,http,https,tcp,tls,crypto,pipe,data,fd",
- "-filter_complex_threads",
- "1",
]
# collect input args
input_args = []