PCM_F32LE = "f32le" # PCM 32-bit floating-point little-endian
PCM_F64LE = "f64le" # PCM 64-bit floating-point little-endian
PCM = "pcm" # PCM generic (details determined later)
- MPEG_DASH = "dash"
UNKNOWN = "?"
@classmethod
import urllib.parse
from collections.abc import AsyncGenerator
from contextlib import suppress
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
import shortuuid
from aiohttp import web
ConfigValueType,
)
from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType
-from music_assistant.common.models.errors import QueueEmpty
+from music_assistant.common.models.errors import AudioError, QueueEmpty
from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import (
ANNOUNCE_ALERT_FILE,
CONF_BIND_IP,
CONF_PUBLISH_IP,
SILENCE_FILE,
UGP_PREFIX,
+ VERBOSE_LOG_LEVEL,
)
from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
+ get_ffmpeg_args,
get_ffmpeg_stream,
- get_media_stream,
get_player_filter_params,
+ get_radio_stream,
+ parse_loudnorm,
+ strip_silence,
)
+from music_assistant.server.helpers.process import AsyncProcess
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
from music_assistant.server.models.core_controller import CoreController
bit_depth=queue_item.streamdetails.audio_format.bit_depth,
)
async for chunk in get_ffmpeg_stream(
- audio_input=get_media_stream(
- self.mass,
+ audio_input=self._get_media_stream(
streamdetails=queue_item.streamdetails,
pcm_format=pcm_format,
),
bytes_written = 0
buffer = b""
# handle incoming audio chunks
- async for chunk in get_media_stream(
- self.mass,
+ async for chunk in self._get_media_stream(
queue_track.streamdetails,
pcm_format=pcm_format,
# strip silence from begin/end if track is being crossfaded
):
yield chunk
+ async def _get_media_stream(
+ self,
+ streamdetails: StreamDetails,
+ pcm_format: AudioFormat,
+ strip_silence_begin: bool = False,
+ strip_silence_end: bool = False,
+ ) -> AsyncGenerator[tuple[bool, bytes], None]:
+ """
+ Get the (raw PCM) audio stream for the given streamdetails.
+
+ Other than stripping silence at end and beginning and optional
+ volume normalization this is the pure, unaltered audio data as PCM chunks.
+ """
+ logger = self.logger.getChild("media_stream")
+ is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
+ if is_radio or streamdetails.seek_position:
+ strip_silence_begin = False
+ # chunk size = 2 seconds of pcm audio
+ pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
+ chunk_size = pcm_sample_size * (1 if is_radio else 2)
+ expected_chunks = int((streamdetails.duration or 0) / 2)
+ if expected_chunks < 10:
+ strip_silence_end = False
+
+ # collect all arguments for ffmpeg
+ filter_params = []
+ extra_args = []
+ seek_pos = (
+ streamdetails.seek_position
+ if (streamdetails.direct or not streamdetails.can_seek)
+ else 0
+ )
+ if seek_pos:
+ # only use ffmpeg seeking if the provider stream does not support seeking
+ extra_args += ["-ss", str(seek_pos)]
+ if streamdetails.target_loudness is not None:
+ # add loudnorm filters
+ filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
+ if streamdetails.loudness:
+ filter_rule += f":measured_I={streamdetails.loudness.integrated}"
+ filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
+ filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
+ filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
+ filter_rule += ":print_format=json"
+ filter_params.append(filter_rule)
+ if streamdetails.fade_in:
+ filter_params.append("afade=type=in:start_time=0:duration=3")
+
+ if is_radio and streamdetails.direct and streamdetails.direct.startswith("http"):
+ # ensure we use the radio streamer for radio items
+ audio_source_iterator = get_radio_stream(self.mass, streamdetails.direct, streamdetails)
+ input_path = "-"
+ elif streamdetails.direct:
+ audio_source_iterator = None
+ input_path = streamdetails.direct
+ else:
+ audio_source_iterator = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+ streamdetails,
+ seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
+ )
+ input_path = "-"
+
+ ffmpeg_args = get_ffmpeg_args(
+ input_format=streamdetails.audio_format,
+ output_format=pcm_format,
+ filter_params=filter_params,
+ extra_args=extra_args,
+ input_path=input_path,
+ loglevel="info", # needed for loudness measurement
+ )
+
+ async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]):
+ # To prevent stderr locking up, we must keep reading it
+ stderr_data = ""
+ async for line in ffmpeg_proc.iter_stderr():
+ line = line.decode().strip() # noqa: PLW2901
+ if not line:
+ continue
+ if stderr_data or "loudnorm" in line:
+ stderr_data += line
+ else:
+ self.logger.log(VERBOSE_LOG_LEVEL, line)
+
+ # if we reach this point, the process is finished (finish or aborted)
+ if ffmpeg_proc.returncode == 0:
+ await state_data["finished"].wait()
+ finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set()
+ bytes_sent = state_data["bytes_sent"]
+ seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
+ streamdetails.seconds_streamed = seconds_streamed
+ state_str = "finished" if finished else "aborted"
+ logger.debug(
+ "stream %s for: %s (%s seconds streamed, exitcode %s)",
+ state_str,
+ streamdetails.uri,
+ seconds_streamed,
+ ffmpeg_proc.returncode,
+ )
+ # store accurate duration
+ if finished:
+ streamdetails.duration = streamdetails.seek_position + seconds_streamed
+
+ # parse loudnorm data if we have that collected
+ if stderr_data and (loudness_details := parse_loudnorm(stderr_data)):
+ required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+ if finished or (seconds_streamed >= required_seconds):
+ logger.debug(
+ "Loudness measurement for %s: %s", streamdetails.uri, loudness_details
+ )
+ streamdetails.loudness = loudness_details
+ await self.mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness_details
+ )
+
+ # report playback
+ if finished or seconds_streamed > 30:
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+ )
+ )
+ if music_prov := self.mass.get_provider(streamdetails.provider):
+ self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+
+ # cleanup
+ del state_data
+ del ffmpeg_proc
+
+ async with AsyncProcess(
+ ffmpeg_args,
+ enable_stdin=audio_source_iterator is not None,
+ enable_stderr=True,
+ custom_stdin=audio_source_iterator,
+ name="ffmpeg_media_stream",
+ ) as ffmpeg_proc:
+ state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
+ logger.debug("start media stream for: %s", streamdetails.uri)
+
+ self.mass.create_task(log_reader(ffmpeg_proc, state_data))
+
+ # get pcm chunks from stdout
+ # we always stay one chunk behind to properly detect end of chunks
+ # so we can strip silence at the beginning and end of a track
+ prev_chunk = b""
+ chunk_num = 0
+ async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+ chunk_num += 1
+ if strip_silence_begin and chunk_num == 2:
+ # first 2 chunks received, strip silence of beginning
+ stripped_audio = await strip_silence(
+ self.mass,
+ prev_chunk + chunk,
+ sample_rate=pcm_format.sample_rate,
+ bit_depth=pcm_format.bit_depth,
+ )
+ yield stripped_audio
+ state_data["bytes_sent"] += len(stripped_audio)
+ prev_chunk = b""
+ del stripped_audio
+ continue
+ if strip_silence_end and chunk_num >= (expected_chunks - 6):
+ # last part of the track, collect multiple chunks to strip silence later
+ prev_chunk += chunk
+ continue
+
+ # middle part of the track, send previous chunk and collect current chunk
+ if prev_chunk:
+ yield prev_chunk
+ state_data["bytes_sent"] += len(prev_chunk)
+
+ # collect this chunk for next round
+ prev_chunk = chunk
+
+ # we did not receive any data, somethinh wet wrong
+ # raise here to prevent an endless loop elsewhere
+ if state_data["bytes_sent"] == 0:
+ raise AudioError("stream error on %s", streamdetails.uri)
+
+ # all chunks received, strip silence of last part if needed and yield remaining bytes
+ if strip_silence_end and prev_chunk:
+ final_chunk = await strip_silence(
+ self.mass,
+ prev_chunk,
+ sample_rate=pcm_format.sample_rate,
+ bit_depth=pcm_format.bit_depth,
+ reverse=True,
+ )
+ else:
+ final_chunk = prev_chunk
+
+ # yield final chunk to output
+ yield final_chunk
+ state_data["bytes_sent"] += len(final_chunk)
+ state_data["finished"].set()
+ del final_chunk
+ del prev_chunk
+
def _log_request(self, request: web.Request) -> None:
"""Log request."""
if not self.logger.isEnabledFor(logging.DEBUG):
import os
import re
import struct
-from contextlib import suppress
from io import BytesIO
from time import time
from typing import TYPE_CHECKING
MediaNotFoundError,
MusicAssistantError,
)
-from music_assistant.common.models.media_items import AudioFormat, ContentType, MediaType
+from music_assistant.common.models.media_items import AudioFormat, ContentType
from music_assistant.common.models.streamdetails import LoudnessMeasurement, StreamDetails
from music_assistant.constants import (
CONF_EQ_BASS,
ROOT_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
)
-from music_assistant.server.helpers.playlists import fetch_playlist
+from music_assistant.server.helpers.playlists import (
+ HLS_CONTENT_TYPES,
+ IsHLSPlaylist,
+ fetch_playlist,
+ parse_m3u,
+)
from .process import AsyncProcess, check_output
from .util import create_tempfile
LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.audio")
# pylint:disable=consider-using-f-string,too-many-locals,too-many-statements
+# ruff: noqa: PLR0915
+
+VLC_HEADERS = {"User-Agent": "VLC/3.0.2.LibVLC/3.0.2"}
+VLC_HEADERS_ICY = {**VLC_HEADERS, "Icy-MetaData": "1"}
async def crossfade_pcm_parts(
if not streamdetails.duration:
streamdetails.duration = queue_item.duration
- # make sure that ffmpeg handles mpeg dash streams directly
- if (
- streamdetails.audio_format.content_type == ContentType.MPEG_DASH
- and streamdetails.data
- and streamdetails.data.startswith("http")
- ):
- streamdetails.direct = streamdetails.data
return streamdetails
return file.getvalue()
-async def get_media_stream( # noqa: PLR0915
- mass: MusicAssistant,
- streamdetails: StreamDetails,
- pcm_format: AudioFormat,
- strip_silence_begin: bool = False,
- strip_silence_end: bool = False,
-) -> AsyncGenerator[tuple[bool, bytes], None]:
- """
- Get the (raw PCM) audio stream for the given streamdetails.
-
- Other than stripping silence at end and beginning and optional
- volume normalization this is the pure, unaltered audio data as PCM chunks.
- """
- bytes_sent = 0
- is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
- if is_radio or streamdetails.seek_position:
- strip_silence_begin = False
- # chunk size = 2 seconds of pcm audio
- pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
- chunk_size = pcm_sample_size * (1 if is_radio else 2)
- expected_chunks = int((streamdetails.duration or 0) / 2)
- if expected_chunks < 10:
- strip_silence_end = False
-
- # collect all arguments for ffmpeg
- filter_params = []
- extra_args = []
- seek_pos = (
- streamdetails.seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0
- )
- if seek_pos:
- # only use ffmpeg seeking if the provider stream does not support seeking
- extra_args += ["-ss", str(seek_pos)]
- if streamdetails.target_loudness is not None:
- # add loudnorm filters
- filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
- if streamdetails.loudness:
- filter_rule += f":measured_I={streamdetails.loudness.integrated}"
- filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
- filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
- filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
- filter_rule += ":print_format=json"
- filter_params.append(filter_rule)
- if streamdetails.fade_in:
- filter_params.append("afade=type=in:start_time=0:duration=3")
- ffmpeg_args = get_ffmpeg_args(
- input_format=streamdetails.audio_format,
- output_format=pcm_format,
- filter_params=filter_params,
- extra_args=extra_args,
- input_path=streamdetails.direct or "-",
- loglevel="info", # needed for loudness measurement
- )
-
- finished = False
-
- ffmpeg_proc = AsyncProcess(
- ffmpeg_args,
- enable_stdin=streamdetails.direct is None,
- enable_stderr=True,
- custom_stdin=mass.get_provider(streamdetails.provider).get_audio_stream(
- streamdetails,
- seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
- )
- if not streamdetails.direct
- else None,
- name="ffmpeg_media_stream",
- )
- await ffmpeg_proc.start()
- logger = LOGGER.getChild("media_stream")
- logger.debug("start media stream for: %s", streamdetails.uri)
-
- # get pcm chunks from stdout
- # we always stay one chunk behind to properly detect end of chunks
- # so we can strip silence at the beginning and end of a track
- prev_chunk = b""
- chunk_num = 0
- try:
- async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
- chunk_num += 1
- if strip_silence_begin and chunk_num == 2:
- # first 2 chunks received, strip silence of beginning
- stripped_audio = await strip_silence(
- mass,
- prev_chunk + chunk,
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- )
- yield stripped_audio
- bytes_sent += len(stripped_audio)
- prev_chunk = b""
- del stripped_audio
- continue
- if strip_silence_end and chunk_num >= (expected_chunks - 6):
- # last part of the track, collect multiple chunks to strip silence later
- prev_chunk += chunk
- continue
-
- # middle part of the track, send previous chunk and collect current chunk
- if prev_chunk:
- yield prev_chunk
- bytes_sent += len(prev_chunk)
- prev_chunk = chunk
-
- # all chunks received, strip silence of last part if needed and yield remaining bytes
- if strip_silence_end and prev_chunk:
- final_chunk = await strip_silence(
- mass,
- prev_chunk,
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- reverse=True,
- )
- else:
- final_chunk = prev_chunk
- yield final_chunk
- bytes_sent += len(final_chunk)
- del final_chunk
- del prev_chunk
- finished = True
- finally:
- seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
- streamdetails.seconds_streamed = seconds_streamed
- state_str = "finished" if finished else "aborted"
- logger.debug(
- "stream %s for: %s (%s seconds streamed)",
- state_str,
- streamdetails.uri,
- seconds_streamed,
- )
- # store accurate duration
- if finished:
- streamdetails.duration = streamdetails.seek_position + seconds_streamed
-
- # use communicate to read stderr and wait for exit
- # read log for loudness measurement (or errors)
- try:
- _, stderr = await asyncio.wait_for(ffmpeg_proc.communicate(), 5)
- except TimeoutError:
- stderr = b""
- # ensure to send close here so we terminate and cleanup the process
- await ffmpeg_proc.close()
- if ffmpeg_proc.returncode != 0 and not bytes_sent:
- logger.warning("stream error on %s", streamdetails.uri)
- elif stderr and (loudness_details := _parse_loudnorm(stderr)):
- required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
- if finished or (seconds_streamed >= required_seconds):
- LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
- streamdetails.loudness = loudness_details
- await mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness_details
- )
- elif stderr:
- logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
-
- # report playback
- if finished or seconds_streamed > 30:
- mass.create_task(
- mass.music.mark_item_played(
- streamdetails.media_type, streamdetails.item_id, streamdetails.provider
- )
- )
- if music_prov := mass.get_provider(streamdetails.provider):
- mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
-
-
-async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]:
+async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]:
"""
Resolve a streaming radio URL.
Unwraps any playlists if needed.
Determines if the stream supports ICY metadata.
- Returns unfolded URL and a bool if the URL supports ICY metadata.
+ Returns tuple;
+ - unfolded URL as string
+ - bool if the URL supports ICY metadata.
+ - bool uf the URL represents a HLS stream/playlist.
"""
- cache_key = f"resolved_radio_url_{url}"
+ base_url = url.split("?")[0]
+ cache_key = f"resolved_radio_{url}"
if cache := await mass.cache.get(cache_key):
return cache
- # handle playlisted radio urls
- is_mpeg_dash = False
+ is_hls = False
supports_icy = False
- if ".m3u" in url or ".pls" in url:
- # url is playlist, try to figure out how to handle it
- with suppress(InvalidDataError, IndexError):
- playlist = await fetch_playlist(mass, url)
- if len(playlist) > 1 or ".m3u" in playlist[0] or ".pls" in playlist[0]:
- # if it is an mpeg-dash stream, let ffmpeg handle that
- is_mpeg_dash = True
- url = playlist[0]
- if not is_mpeg_dash:
- # determine ICY metadata support by looking at the http headers
- headers = {"Icy-MetaData": "1", "User-Agent": "VLC/3.0.2.LibVLC/3.0.2"}
- timeout = ClientTimeout(total=0, connect=10, sock_read=5)
- try:
- async with mass.http_session.head(
- url, headers=headers, allow_redirects=True, timeout=timeout
- ) as resp:
- headers = resp.headers
- supports_icy = int(headers.get("icy-metaint", "0")) > 0
- except ClientResponseError as err:
- LOGGER.debug("Error while parsing radio URL %s: %s", url, err)
-
- result = (url, supports_icy)
+ resolved_url = url
+ timeout = ClientTimeout(total=0, connect=10, sock_read=5)
+ try:
+ async with mass.http_session.head(
+ url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout
+ ) as resp:
+ resolved_url = str(resp.real_url)
+ headers = resp.headers
+ supports_icy = int(headers.get("icy-metaint", "0")) > 0
+ is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
+ if (
+ base_url.endswith((".m3u", ".m3u8", ".pls"))
+ or headers.get("content-type") == "audio/x-mpegurl"
+ ):
+ # url is playlist, we need to unfold it
+ try:
+ for line in await fetch_playlist(mass, resolved_url):
+ if not line.is_url:
+ continue
+ # unfold first url of playlist
+ return await resolve_radio_stream(mass, line.path)
+ raise InvalidDataError("No content found in playlist")
+ except IsHLSPlaylist:
+ is_hls = True
+
+ except (ClientResponseError, InvalidDataError) as err:
+ LOGGER.warning("Error while parsing radio URL %s: %s", url, err)
+ return (resolved_url, supports_icy, is_hls)
+
+ result = (resolved_url, supports_icy, is_hls)
await mass.cache.set(cache_key, result, expiration=86400)
return result
mass: MusicAssistant, url: str, streamdetails: StreamDetails
) -> AsyncGenerator[bytes, None]:
"""Get radio audio stream from HTTP, including metadata retrieval."""
- headers = {"Icy-MetaData": "1", "User-Agent": "VLC/3.0.2.LibVLC/3.0.2"}
- timeout = ClientTimeout(total=0, connect=30, sock_read=60)
- retries = 5
- while retries:
+ resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url)
+ retries = 0
+ while True:
try:
- async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
- headers = resp.headers
- meta_int = int(headers.get("icy-metaint", "0"))
- # stream with ICY Metadata
- if meta_int:
- LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
- while True:
- try:
- audio_chunk = await resp.content.readexactly(meta_int)
- yield audio_chunk
- meta_byte = await resp.content.readexactly(1)
- meta_length = ord(meta_byte) * 16
- meta_data = await resp.content.readexactly(meta_length)
- except asyncio.exceptions.IncompleteReadError:
- break
- if not meta_data:
- continue
- meta_data = meta_data.rstrip(b"\0")
- stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
- if not stream_title:
- continue
- stream_title = stream_title.group(1).decode()
- if stream_title != streamdetails.stream_title:
- streamdetails.stream_title = stream_title
- # Regular HTTP stream
- else:
- LOGGER.debug("Start streaming radio without ICY metadata from url %s", url)
- async for chunk in resp.content.iter_any():
- yield chunk
- LOGGER.debug("Finished streaming radio from url %s", url)
- except ClientError as err:
- LOGGER.warning(
- "Error while streaming radio %s: %s",
- url,
- str(err),
- exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
- )
- if retries == 0:
+ retries += 1
+ if is_hls: # special HLS stream
+ async for chunk in get_hls_stream(mass, resolved_url, streamdetails):
+ yield chunk
+ elif supports_icy: # http stream supports icy metadata
+ async for chunk in get_icy_stream(mass, resolved_url, streamdetails):
+ yield chunk
+ else: # generic http stream (without icy metadata)
+ async for chunk in get_http_stream(mass, resolved_url, streamdetails):
+ yield chunk
+ except ClientError:
+ LOGGER.warning("Streaming radio %s failed, retrying...", streamdetails.uri)
+ if retries >= 5:
raise
- retries -= 1
+ await asyncio.sleep(1 * retries)
+
+
+async def get_icy_stream(
+ mass: MusicAssistant, url: str, streamdetails: StreamDetails
+) -> AsyncGenerator[bytes, None]:
+ """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
+ timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+ LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
+ async with mass.http_session.get(url, headers=VLC_HEADERS_ICY, timeout=timeout) as resp:
+ headers = resp.headers
+ meta_int = int(headers["icy-metaint"])
+ while True:
+ try:
+ audio_chunk = await resp.content.readexactly(meta_int)
+ yield audio_chunk
+ meta_byte = await resp.content.readexactly(1)
+ meta_length = ord(meta_byte) * 16
+ meta_data = await resp.content.readexactly(meta_length)
+ except asyncio.exceptions.IncompleteReadError:
+ break
+ if not meta_data:
+ continue
+ meta_data = meta_data.rstrip(b"\0")
+ stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
+ if not stream_title:
+ continue
+ stream_title = stream_title.group(1).decode()
+ if stream_title != streamdetails.stream_title:
+ streamdetails.stream_title = stream_title
+
+
+async def get_hls_stream(
+ mass: MusicAssistant, url: str, streamdetails: StreamDetails
+) -> AsyncGenerator[bytes, None]:
+ """Get audio stream from HTTP HLS stream."""
+ timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+ # fetch master playlist and select (best) child playlist
+ # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
+ async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
+ charset = resp.charset or "utf-8"
+ master_m3u_data = await resp.text(charset)
+ substreams = parse_m3u(master_m3u_data)
+ if any(x for x in substreams if x.path.endswith(".ts")) or not all(
+ x for x in substreams if x.stream_info is not None
+ ):
+ # the url we got is already a substream
+ substream_url = url
+ else:
+ # sort substreams on best quality (highest bandwidth)
+ substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
+ substream = substreams[0]
+ substream_url = substream.path
+ if not substream_url.startswith("http"):
+ # path is relative, stitch it together
+ base_path = url.rsplit("/", 1)[0]
+ substream_url = base_path + "/" + substream.path
+
+ async def watch_metadata():
+ # ffmpeg is not (yet?) able to handle metadata updates that is provided
+ # in the substream playlist and/or the ID3 metadata
+ # so we do that here in a separate task.
+ # this also gets the basic
+ prev_chunk = ""
+ while True:
+ async with mass.http_session.get(
+ substream_url, headers=VLC_HEADERS, timeout=timeout
+ ) as resp:
+ charset = resp.charset or "utf-8"
+ substream_m3u_data = await resp.text(charset)
+ # get chunk-parts from the substream
+ hls_chunks = parse_m3u(substream_m3u_data)
+ metadata_found = False
+ for chunk_item in hls_chunks:
+ if chunk_item.path == prev_chunk:
+ continue
+ chunk_item_url = chunk_item.path
+ if not chunk_item_url.startswith("http"):
+ # path is relative, stitch it together
+ base_path = substream_url.rsplit("/", 1)[0]
+ chunk_item_url = base_path + "/" + chunk_item.path
+ if chunk_item.title and chunk_item.title != "no desc":
+ streamdetails.stream_title = chunk_item.title
+ metadata_found = True
+ # prevent that we play this chunk again if we loop through
+ prev_chunk = chunk_item.path
+ if chunk_item.length and chunk_item.length.isnumeric():
+ await asyncio.sleep(int(chunk_item.length))
+ else:
+ await asyncio.sleep(5)
+ if not metadata_found:
+ # this station does not provide metadata embedded in the HLS playlist
+ return
+
+ LOGGER.debug(
+ "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
+ )
+
+ input_format = streamdetails.audio_format
+ output_format = streamdetails.audio_format
+ if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
+ streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
+ output_format = AudioFormat(content_type=ContentType.FLAC)
+
+ try:
+ metadata_task = asyncio.create_task(watch_metadata())
+ async for chunk in get_ffmpeg_stream(
+ audio_input=substream_url,
+ input_format=input_format,
+ output_format=output_format,
+ ):
+ yield chunk
+ finally:
+ if metadata_task and not metadata_task.done():
+ metadata_task.cancel()
async def get_http_stream(
seek_position: int = 0,
) -> AsyncGenerator[bytes, None]:
"""Get audio stream from HTTP."""
+ LOGGER.debug("Start HTTP stream for %s (seek_position %s)", streamdetails.uri, seek_position)
if seek_position:
assert streamdetails.duration, "Duration required for seek requests"
# try to get filesize with a head request
buffer_all = False
bytes_received = 0
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
- async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
+ async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
is_partial = resp.status == 206
buffer_all = seek_position and not is_partial
async for chunk in resp.content.iter_any():
if buffer_all:
skip_bytes = streamdetails.size / streamdetails.duration * seek_position
yield buffer[:skip_bytes]
+ LOGGER.debug(
+ "Finished HTTP stream for %s (transferred %s/%s bytes)",
+ streamdetails.uri,
+ bytes_received,
+ streamdetails.size,
+ )
async def get_file_stream(
filter_params: list[str] | None = None,
extra_args: list[str] | None = None,
chunk_size: int | None = None,
+ loglevel: str | None = None,
) -> AsyncGenerator[bytes, None]:
"""
Get the ffmpeg audio stream as async generator.
Takes care of resampling and/or recoding if needed,
according to player preferences.
"""
+ if loglevel is None:
+ loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet"
use_stdin = not isinstance(audio_input, str)
ffmpeg_args = get_ffmpeg_args(
input_format=input_format,
extra_args=extra_args or [],
input_path="-" if use_stdin else audio_input,
output_path="-",
- loglevel="info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
+ loglevel=loglevel,
)
async with AsyncProcess(
ffmpeg_args,
return generic_args + input_args + extra_args + output_args
-def _parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None:
+def parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None:
"""Parse Loudness measurement from ffmpeg stderr output."""
stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
if "[Parsed_loudnorm_" not in stderr_data:
return None
stderr_data = stderr_data.split("[Parsed_loudnorm_")[1]
stderr_data = stderr_data.rsplit("]")[-1].strip()
+ stderr_data = stderr_data.rsplit("}")[0].strip() + "}"
try:
loudness_data = json_loads(stderr_data)
except JSON_DECODE_EXCEPTIONS:
-"""Helpers for parsing playlists."""
+"""Helpers for parsing (online and offline) playlists."""
from __future__ import annotations
+import configparser
import logging
+from dataclasses import dataclass
from typing import TYPE_CHECKING
+from urllib.parse import urlparse
-import aiohttp
+from aiohttp import client_exceptions
from music_assistant.common.models.errors import InvalidDataError
LOGGER = logging.getLogger(__name__)
+HLS_CONTENT_TYPES = (
+ # https://tools.ietf.org/html/draft-pantos-http-live-streaming-19#section-10
+ "application/vnd.apple.mpegurl",
+ # Additional informal types used by Mozilla gecko not included as they
+ # don't reliably indicate HLS streams
+)
-async def parse_m3u(m3u_data: str) -> list[str]:
- """Parse (only) filenames/urls from m3u playlist file."""
+class IsHLSPlaylist(InvalidDataError):
+ """The playlist from an HLS stream and should not be parsed."""
+
+
+@dataclass
+class PlaylistItem:
+ """Playlist item."""
+
+ path: str
+ length: str | None = None
+ title: str | None = None
+ stream_info: dict[str, str] | None = None
+
+ @property
+ def is_url(self) -> bool:
+ """Validate the URL can be parsed and at least has scheme + netloc."""
+ result = urlparse(self.path)
+ return all([result.scheme, result.netloc])
+
+
+def parse_m3u(m3u_data: str) -> list[PlaylistItem]:
+ """Very simple m3u parser.
+
+ Based on https://github.com/dvndrsn/M3uParser/blob/master/m3uparser.py
+ """
+ # From Mozilla gecko source: https://github.com/mozilla/gecko-dev/blob/c4c1adbae87bf2d128c39832d72498550ee1b4b8/dom/media/DecoderTraits.cpp#L47-L52
+
m3u_lines = m3u_data.splitlines()
- lines = []
+
+ playlist = []
+
+ length = None
+ title = None
+ stream_info = None
+
for line in m3u_lines:
line = line.strip() # noqa: PLW2901
- if line.startswith("#"):
- # ignore metadata
+ if line.startswith("#EXTINF:"):
+ # Get length and title from #EXTINF line
+ info = line.split("#EXTINF:")[1].split(",", 1)
+ if len(info) != 2:
+ continue
+ length = info[0].strip()[0]
+ title = info[1].strip()
+ elif line.startswith("#EXT-X-STREAM-INF:"):
+ # HLS stream properties
+ # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
+ stream_info = {}
+ for part in line.replace("#EXT-X-STREAM-INF:", "").split(","):
+ if "=" not in part:
+ continue
+ kev_value_parts = part.strip().split("=")
+ stream_info[kev_value_parts[0]] = kev_value_parts[1]
+ elif line.startswith("#"):
+ # Ignore other extensions
continue
- if len(line) != 0:
- # Get uri/path from all other, non-blank lines
- lines.append(line)
+ elif len(line) != 0:
+ # Get song path from all other, non-blank lines
+ playlist.append(
+ PlaylistItem(path=line, length=length, title=title, stream_info=stream_info)
+ )
+ # reset the song variables so it doesn't use the same EXTINF more than once
+ length = None
+ title = None
+ stream_info = None
- return lines
+ return playlist
-async def parse_pls(pls_data: str) -> list[str]:
+def parse_pls(pls_data: str) -> list[PlaylistItem]:
"""Parse (only) filenames/urls from pls playlist file."""
- pls_lines = pls_data.splitlines()
- lines = []
- for line in pls_lines:
- line = line.strip() # noqa: PLW2901
- if not line.startswith("File"):
- # ignore metadata lines
- continue
- if "=" in line:
- # Get uri/path from all other, non-blank lines
- lines.append(line.split("=")[1])
+ pls_parser = configparser.ConfigParser()
+ try:
+ pls_parser.read_string(pls_data, "playlist")
+ except configparser.Error as err:
+ raise InvalidDataError("Can't parse playlist") from err
+
+ if "playlist" not in pls_parser or pls_parser["playlist"].getint("Version") != 2:
+ raise InvalidDataError("Invalid playlist")
- return lines
+ try:
+ num_entries = pls_parser.getint("playlist", "NumberOfEntries")
+ except (configparser.NoOptionError, ValueError) as err:
+ raise InvalidDataError("Invalid NumberOfEntries in playlist") from err
+ playlist_section = pls_parser["playlist"]
-async def fetch_playlist(mass: MusicAssistant, url: str) -> list[str]:
+ playlist = []
+ for entry in range(1, num_entries + 1):
+ file_option = f"File{entry}"
+ if file_option not in playlist_section:
+ continue
+ itempath = playlist_section[file_option]
+ playlist.append(
+ PlaylistItem(
+ length=playlist_section.get(f"Length{entry}"),
+ title=playlist_section.get(f"Title{entry}"),
+ path=itempath,
+ )
+ )
+ return playlist
+
+
+async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]:
"""Parse an online m3u or pls playlist."""
try:
async with mass.http_session.get(url, timeout=5) as resp:
except TimeoutError as err:
msg = f"Timeout while fetching playlist {url}"
raise InvalidDataError(msg) from err
- except aiohttp.client_exceptions.ClientError as err:
+ except client_exceptions.ClientError as err:
msg = f"Error while fetching playlist {url}"
raise InvalidDataError(msg) from err
+ if "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data:
+ raise IsHLSPlaylist
+
if url.endswith((".m3u", ".m3u8")):
- playlist = await parse_m3u(playlist_data)
+ playlist = parse_m3u(playlist_data)
else:
- playlist = await parse_pls(playlist_data)
+ playlist = parse_pls(playlist_data)
if not playlist:
msg = f"Empty playlist {url}"
self._custom_stdin = None
self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin)))
self._custom_stdout = custom_stdout
+ self._stderr_locked = asyncio.Lock()
@property
def closed(self) -> bool:
task.cancel()
with suppress(asyncio.CancelledError):
await task
+
if self.proc.returncode is None:
# always first try to send sigint signal to try clean shutdown
# for example ffmpeg needs this to cleanly shutdown and not lock on pipes
# especially with pipes this can cause deadlocks if not properly guarded
# we need to use communicate to ensure buffers are flushed
# we do that with sending communicate
+ if self._enable_stdin and not self.proc.stdin.is_closing():
+ self.proc.stdin.close()
try:
- await asyncio.wait_for(self.proc.communicate(), 2)
+ if self.proc.stdout and self._stderr_locked.locked():
+ await asyncio.wait_for(self.proc.stdout.read(), 5)
+ else:
+ await asyncio.wait_for(self.proc.communicate(), 5)
except TimeoutError:
LOGGER.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
stdout, stderr = await self.proc.communicate(input_data)
return (stdout, stderr)
- async def read_stderr(self) -> AsyncGenerator[bytes, None]:
- """Read lines from the stderr stream."""
- async for line in self.proc.stderr:
- yield line
+ async def iter_stderr(self) -> AsyncGenerator[bytes, None]:
+ """Iterate lines from the stderr stream."""
+ async with self._stderr_locked:
+ async for line in self.proc.stderr:
+ yield line
async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None:
"""Feed stdin with chunks from an AsyncGenerator."""
lost_packets = 0
prev_metadata_checksum: str = ""
prev_progress_report: float = 0
- async for line in self._cliraop_proc.read_stderr():
+ async for line in self._cliraop_proc.iter_stderr():
line = line.decode().strip() # noqa: PLW2901
if not line:
continue
playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
if ext in ("m3u", "m3u8"):
- playlist_lines = await parse_m3u(playlist_data)
+ playlist_lines = parse_m3u(playlist_data)
else:
- playlist_lines = await parse_pls(playlist_data)
+ playlist_lines = parse_pls(playlist_data)
- for line_no, playlist_line in enumerate(playlist_lines, 1):
+ for line_no, playlist_line in enumerate(playlist_lines, 0):
if media_item := await self._parse_playlist_line(
- playlist_line, os.path.dirname(prov_playlist_id), line_no
+ playlist_line.path, os.path.dirname(prov_playlist_id), line_no
):
yield media_item
if not await self.exists(prov_playlist_id):
msg = f"Playlist path does not exist: {prov_playlist_id}"
raise MediaNotFoundError(msg)
- cur_lines = []
_, ext = prov_playlist_id.rsplit(".", 1)
# get playlist file contents
playlist_data += chunk
encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data)
playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
-
+ # get current contents first
if ext in ("m3u", "m3u8"):
- playlist_lines = await parse_m3u(playlist_data)
+ playlist_items = parse_m3u(playlist_data)
else:
- playlist_lines = await parse_pls(playlist_data)
-
- for line_no, playlist_line in enumerate(playlist_lines, 1):
- if line_no not in positions_to_remove:
- cur_lines.append(playlist_line)
-
- new_playlist_data = "\n".join(cur_lines)
- # write playlist file (always in utf-8)
+ playlist_items = parse_pls(playlist_data)
+ # remove items by index
+ for i in sorted(positions_to_remove, reverse=True):
+ del playlist_items[i]
+
+ # build new playlist data
+ new_playlist_data = "#EXTM3U\n"
+ for item in playlist_items:
+ new_playlist_data.append(f"#EXTINF:{item.length or 0},{item.title}\n")
+ new_playlist_data.append(f"{item.path}\n")
await self.write_file_content(prov_playlist_id, new_playlist_data.encode("utf-8"))
async def create_playlist(self, name: str) -> Playlist:
# as creating a new (empty) file with the m3u extension...
# filename = await self.resolve(f"{name}.m3u")
filename = f"{name}.m3u"
- await self.write_file_content(filename, b"")
+ await self.write_file_content(filename, b"#EXTM3U\n")
return await self.get_playlist(filename)
async def get_stream_details(self, item_id: str) -> StreamDetails:
SearchResults,
)
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_radio_stream, resolve_radio_stream
from music_assistant.server.models.music_provider import MusicProvider
SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
"""Get streamdetails for a radio station."""
stream = await self.radios.station(uuid=item_id)
await self.radios.station_click(uuid=item_id)
- url_resolved, supports_icy = await resolve_radio_stream(self.mass, stream.url_resolved)
return StreamDetails(
provider=self.domain,
item_id=item_id,
content_type=ContentType.try_parse(stream.codec),
),
media_type=MediaType.RADIO,
- data=url_resolved,
- direct=url_resolved if not supports_icy else None,
- expires=time() + 24 * 3600,
+ direct=stream.url_resolved,
+ expires=time() + 3600,
)
-
- async def get_audio_stream(
- self,
- streamdetails: StreamDetails,
- seek_position: int = 0,
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
- yield chunk
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_USERNAME
-from music_assistant.server.helpers.audio import get_radio_stream, resolve_radio_stream
from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.music_provider import MusicProvider
content_type=ContentType.UNKNOWN,
),
media_type=MediaType.RADIO,
- data=item_id,
+ direct=item_id,
+ expires=time() + 3600,
)
stream_item_id, media_type = item_id.split("--", 1)
stream_info = await self.__get_data("Tune.ashx", id=stream_item_id)
for stream in stream_info["body"]:
if stream["media_type"] != media_type:
continue
- # check if the radio stream is not a playlist
- url_resolved, supports_icy = await resolve_radio_stream(self.mass, stream["url"])
return StreamDetails(
provider=self.domain,
item_id=item_id,
content_type=ContentType(stream["media_type"]),
),
media_type=MediaType.RADIO,
- data=url_resolved,
- expires=time() + 24 * 3600,
- direct=url_resolved if not supports_icy else None,
+ direct=stream["url"],
+ expires=time() + 3600,
)
msg = f"Unable to retrieve stream details for {item_id}"
raise MediaNotFoundError(msg)
- async def get_audio_stream(
- self,
- streamdetails: StreamDetails,
- seek_position: int = 0,
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
- yield chunk
-
async def __get_data(self, endpoint: str, **kwargs):
"""Get data from api."""
if endpoint.startswith("http"):
from typing import TYPE_CHECKING
from music_assistant.common.models.enums import ContentType, ImageType, MediaType
+from music_assistant.common.models.errors import MediaNotFoundError
from music_assistant.common.models.media_items import (
Artist,
AudioFormat,
get_radio_stream,
resolve_radio_stream,
)
-from music_assistant.server.helpers.playlists import fetch_playlist
from music_assistant.server.helpers.tags import AudioTags, parse_tags
from music_assistant.server.models.music_provider import MusicProvider
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
+ # always prefer db item for existing items to not overwrite user customizations
+ db_item = await self.mass.music.tracks.get_library_item_by_prov_id(
+ prov_track_id, self.instance_id
+ )
+ if db_item is None and not prov_track_id.startswith("http"):
+ msg = f"Track not found: {prov_track_id}"
+ raise MediaNotFoundError(msg)
return await self.parse_item(prov_track_id)
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get full radio details by id."""
- return await self.parse_item(prov_radio_id, force_radio=True)
+ # always prefer db item for existing items to not overwrite user customizations
+ db_item = await self.mass.music.radio.get_library_item_by_prov_id(
+ prov_radio_id, self.instance_id
+ )
+ if db_item is None and not prov_radio_id.startswith("http"):
+ msg = f"Radio not found: {prov_radio_id}"
+ raise MediaNotFoundError(msg)
+ return await self.parse_item(prov_radio_id)
async def get_artist(self, prov_artist_id: str) -> Track:
"""Get full artist details by id."""
async def parse_item(
self,
- item_id_or_url: str,
+ url: str,
force_refresh: bool = False,
force_radio: bool = False,
) -> Track | Radio:
"""Parse plain URL to MediaItem of type Radio or Track."""
- item_id, url, media_info = await self._get_media_info(item_id_or_url, force_refresh)
+ media_info = await self._get_media_info(url, force_refresh)
is_radio = media_info.get("icy-name") or not media_info.duration
provider_mappings = {
ProviderMapping(
- item_id=item_id,
+ item_id=url,
provider_domain=self.domain,
provider_instance=self.instance_id,
audio_format=AudioFormat(
if is_radio or force_radio:
# treat as radio
media_item = Radio(
- item_id=item_id,
+ item_id=url,
provider=self.domain,
name=media_info.get("icy-name") or media_info.title,
provider_mappings=provider_mappings,
)
else:
media_item = Track(
- item_id=item_id,
+ item_id=url,
provider=self.domain,
name=media_info.title,
duration=int(media_info.duration or 0),
]
return media_item
- async def _get_media_info(
- self, item_id_or_url: str, force_refresh: bool = False
- ) -> tuple[str, str, AudioTags]:
- """Retrieve (cached) mediainfo for url."""
- # check if the radio stream is not a playlist
- if item_id_or_url.endswith(("m3u8", "m3u", "pls")):
- playlist = await fetch_playlist(self.mass, item_id_or_url)
- url = playlist[0]
- item_id = item_id_or_url
- self._full_url[item_id] = url
- else:
- url = self._full_url.get(item_id_or_url, item_id_or_url)
- item_id = item_id_or_url
- cache_key = f"{self.instance_id}.media_info.{item_id}"
+ async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
+ """Retrieve mediainfo for url."""
# do we have some cached info for this url ?
+ cache_key = f"{self.instance_id}.media_info.{url}"
cached_info = await self.mass.cache.get(cache_key)
if cached_info and not force_refresh:
- media_info = AudioTags.parse(cached_info)
- else:
- # parse info with ffprobe (and store in cache)
- media_info = await parse_tags(url)
- if "authSig" in url:
- media_info.has_cover_image = False
- await self.mass.cache.set(cache_key, media_info.raw)
- return (item_id, url, media_info)
+ return AudioTags.parse(cached_info)
+ # parse info with ffprobe (and store in cache)
+ resolved_url, _, _ = await resolve_radio_stream(self.mass, url)
+ media_info = await parse_tags(resolved_url)
+ if "authSig" in url:
+ media_info.has_cover_image = False
+ await self.mass.cache.set(cache_key, media_info.raw)
+ return media_info
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a track/radio."""
- item_id, url, media_info = await self._get_media_info(item_id)
+ media_info = await self._get_media_info(item_id)
is_radio = media_info.get("icy-name") or not media_info.duration
- if is_radio:
- url, supports_icy = await resolve_radio_stream(self.mass, url)
return StreamDetails(
provider=self.instance_id,
item_id=item_id,
bit_depth=media_info.bits_per_sample,
),
media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
- direct=None if is_radio and supports_icy else url,
- data=url,
+ direct=item_id if is_radio else None,
+ data=item_id,
)
async def get_audio_stream(