import re
import struct
import time
-from collections import deque
from collections.abc import AsyncGenerator
from io import BytesIO
from typing import TYPE_CHECKING
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, check_output, communicate
-from .tags import parse_tags
from .throttle_retry import BYPASS_THROTTLER
from .util import TimedAsyncGenerator, create_tempfile, detect_charset
try:
await ffmpeg_proc.start()
async for chunk in TimedAsyncGenerator(
- ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 60
+ ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 300
):
# for radio streams we just yield all chunks directly
if streamdetails.media_type == MediaType.RADIO:
"""Get radio audio stream from HTTP HLS playlist."""
logger = LOGGER.getChild("hls_stream")
logger.debug("Start streaming HLS stream for url %s", url)
- timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
- prev_chunks: deque[str] = deque(maxlen=50)
- has_playlist_metadata: bool | None = None
- has_id3_metadata: bool | None = None
# we simply select the best quality substream here
# if we ever want to support adaptive stream selection based on bandwidth
# we need to move the substream selection into the loop below and make it
# the user wants the best quality possible at all times.
playlist_item = await get_hls_substream(mass, url)
substream_url = playlist_item.path
- empty_loops = 0
- while True:
+ loops = 50 if streamdetails.media_type != MediaType.RADIO else 1
+ while loops:
logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
- async with mass.http_session.get(
- substream_url, headers=HTTP_HEADERS, timeout=timeout
- ) as resp:
- resp.raise_for_status()
- raw_data = await resp.read()
- encoding = resp.charset or await detect_charset(raw_data)
- substream_m3u_data = raw_data.decode(encoding)
- # get chunk-parts from the substream
- hls_chunks = parse_m3u(substream_m3u_data)
- chunk_seconds = 0
- time_start = time.time()
- for chunk_item in hls_chunks:
- if chunk_item.path in prev_chunks:
- continue
- chunk_length = int(chunk_item.length) if chunk_item.length else 6
- chunk_item_url = chunk_item.path
- if not chunk_item_url.startswith("http"):
- # path is relative, stitch it together
- base_path = substream_url.rsplit("/", 1)[0]
- chunk_item_url = base_path + "/" + chunk_item.path
- # handle (optional) in-playlist (timed) metadata
- if has_playlist_metadata is None:
- has_playlist_metadata = chunk_item.title not in (None, "")
- logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
- if has_playlist_metadata and chunk_item.title != "no desc":
- # bbc (and maybe others?) set the title to 'no desc'
- cleaned_stream_title = clean_stream_title(chunk_item.title)
- if cleaned_stream_title != streamdetails.stream_title:
- logger.log(
- VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title
- )
- logger.log(
- VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title
- )
- streamdetails.stream_title = cleaned_stream_title
- logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item)
- # prevent that we play this chunk again if we loop through
- prev_chunks.append(chunk_item.path)
- async with mass.http_session.get(
- chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
- ) as resp:
- yield await resp.content.read()
- chunk_seconds += chunk_length
- # handle (optional) in-band (m3u) metadata
- if has_id3_metadata is not None and has_playlist_metadata:
+ # We simply let ffmpeg deal with parsing the HLS playlist and stichting chunks together.
+ # However we do not feed the playlist URL to ffmpeg directly to give us the possibility
+ # to monitor the stream title and other metadata for radio streams in the future.
+ # Also, we've seen cases where ffmpeg sometimes chokes in a stream and aborts, which is not
+ # very useful for radio streams which you want to simply go on forever, so we need to loop
+ # and restart ffmpeg in case of an error.
+ input_format = AudioFormat(content_type=ContentType.UNKNOWN)
+ audio_format_detected = False
+ async for chunk in get_ffmpeg_stream(
+ audio_input=substream_url,
+ input_format=input_format,
+ output_format=AudioFormat(content_type=ContentType.WAV),
+ ):
+ yield chunk
+ if audio_format_detected:
continue
- if has_id3_metadata in (None, True):
- tags = await parse_tags(chunk_item_url)
- has_id3_metadata = tags.title and tags.title not in chunk_item.path
- logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
-
- # end of playlist reached - we loop around to get the next playlist with chunks
- # safeguard for an endless loop
- # this may happen if we're simply going too fast for the live stream
- # we already throttle it a bit but we may end up in a situation where something is wrong
- # and we want to break out of this loop, hence this check
- if chunk_seconds == 0:
- empty_loops += 1
- await asyncio.sleep(1)
- else:
- empty_loops = 0
- if empty_loops == 50:
- logger.warning("breaking out of endless loop")
- break
- # ensure that we're not going to fast - otherwise we get the same substream playlist
- while (time.time() - time_start) < (chunk_seconds - 1):
- await asyncio.sleep(0.5)
+ if input_format.content_type not in (ContentType.UNKNOWN, ContentType.WAV):
+ # we need to determine the audio format from the first chunk
+ streamdetails.audio_format = input_format
+ audio_format_detected = True
+ loops -= 1
async def get_hls_substream(
raw_data = await resp.read()
encoding = resp.charset or await detect_charset(raw_data)
master_m3u_data = raw_data.decode(encoding)
- if not allow_encrypted and "EXT-X-KEY:METHOD=AES-128" in master_m3u_data:
- # for now we don't support encrypted HLS streams
+ if not allow_encrypted and "EXT-X-KEY:METHOD=" in master_m3u_data:
+ # for now we do not (yet) support encrypted HLS streams
raise InvalidDataError("HLS stream is encrypted, not supported")
substreams = parse_m3u(master_m3u_data)
- if any(x for x in substreams if x.length or x.key):
- # this is already a substream!
- return PlaylistItem(
- path=url,
- )
+ # There is a chance that we did not get a master playlist with subplaylists
+ # but just a single master/sub playlist with the actual audio stream(s)
+ # so we need to detect if the playlist child's contain audio streams or
+ # sub-playlists.
+ if any(
+ x
+ for x in substreams
+ if (x.length or x.path.endswith((".mp4", ".aac")))
+ and not x.path.endswith((".m3u", ".m3u8"))
+ ):
+ return PlaylistItem(path=url, key=substreams[0].key)
# sort substreams on best quality (highest bandwidth) when available
if any(x for x in substreams if x.stream_info):
substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)