def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
"""Set default enum member if an unknown value is provided."""
return cls.UNKNOWN
+
+
+class StreamType(StrEnum):
+ """Enum for the type of streamdetails."""
+
+ HTTP = "http"
+ LOCAL_FILE = "local_file"
+ CUSTOM = "custom"
from __future__ import annotations
from dataclasses import dataclass
-from time import time
from typing import Any
from mashumaro import DataClassDictMixin
-from music_assistant.common.models.enums import MediaType
+from music_assistant.common.models.enums import MediaType, StreamType
from music_assistant.common.models.media_items import AudioFormat
item_id: str
audio_format: AudioFormat
media_type: MediaType = MediaType.TRACK
+ stream_type: StreamType = StreamType.CUSTOM
+ path: str | None = None
# stream_title: radio streams can optionally set this field
stream_title: str | None = None
duration: int | None = None
# total size in bytes of the item, calculated at eof when omitted
size: int | None = None
- # expires: timestamp this streamdetails expire
- expires: float = time() + 3600
# data: provider specific data (not exposed externally)
# this info is for example used to pass details to the get_audio_stream
data: Any = None
# can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item
can_seek: bool = True
+ # stream_type:
+
# the fields below will be set/controlled by the streamcontroller
seek_position: int = 0
fade_in: bool = False
PlayerType,
ProviderFeature,
ProviderType,
+ StreamType,
)
from music_assistant.common.models.errors import (
AlreadyRegisteredError,
audio_format=AudioFormat(
content_type=ContentType.try_parse(url),
),
+ stream_type=StreamType.HTTP,
media_type=MediaType.ANNOUNCEMENT,
data={"url": url, "use_pre_announce": use_pre_announce},
+ path=url,
target_loudness=-10,
),
)
import urllib.parse
from collections.abc import AsyncGenerator
from contextlib import suppress
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
import shortuuid
from aiofiles.os import wrap
ConfigValueOption,
ConfigValueType,
)
-from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType
-from music_assistant.common.models.errors import AudioError, QueueEmpty
+from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType, StreamType
+from music_assistant.common.models.errors import QueueEmpty
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import (
CONF_PUBLISH_IP,
SILENCE_FILE,
UGP_PREFIX,
- VERBOSE_LOG_LEVEL,
)
from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
+ FFMpeg,
check_audio_support,
crossfade_pcm_parts,
- get_ffmpeg_args,
get_ffmpeg_stream,
+ get_hls_stream,
+ get_icy_stream,
get_player_filter_params,
parse_loudnorm,
+ resolve_radio_stream,
strip_silence,
)
-from music_assistant.server.helpers.process import AsyncProcess
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
from music_assistant.server.models.core_controller import CoreController
strip_silence_end = False
# pcm_sample_size = chunk size = 1 second of pcm audio
pcm_sample_size = pcm_format.pcm_sample_size
- buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size
- buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size
+ buffer_size_begin = pcm_sample_size * 3 if strip_silence_begin else pcm_sample_size * 2
+ buffer_size_end = pcm_sample_size * 6 if strip_silence_end else pcm_sample_size * 2
# collect all arguments for ffmpeg
filter_params = []
if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
- ffmpeg_args = get_ffmpeg_args(
+ if streamdetails.stream_type == StreamType.CUSTOM:
+ audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+ streamdetails,
+ seek_position=streamdetails.seek_position,
+ )
+ elif streamdetails.media_type == MediaType.RADIO:
+ resolved_url, supports_icy, is_hls = await resolve_radio_stream(
+ self.mass, streamdetails.path
+ )
+ if supports_icy:
+ audio_source = get_icy_stream(self.mass, resolved_url, streamdetails)
+ elif is_hls:
+ audio_source = get_hls_stream(self.mass, resolved_url, streamdetails)
+ else:
+ audio_source = resolved_url
+ else:
+ audio_source = streamdetails.path
+ extra_input_args = []
+ if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM:
+ extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
+ logger.debug("start media stream for: %s", streamdetails.uri)
+ state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
+
+ async with FFMpeg(
+ audio_input=audio_source,
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
- input_path="-",
- # loglevel info is needed for loudness measurement
- loglevel="info",
# we criple ffmpeg a bit on purpose with the filter_threads
# option so it doesn't consume all cpu when calculating loudnorm
- extra_input_args=["-filter_threads", "1"],
- )
+ extra_input_args=[*extra_input_args, "-filter_threads", "1"],
+ name="ffmpeg_media_stream",
+ stderr_enabled=True,
+ ) as ffmpeg_proc:
- async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]):
- # To prevent stderr locking up, we must keep reading it
- stderr_data = ""
- async for line in ffmpeg_proc.iter_stderr():
- line = line.decode().strip() # noqa: PLW2901
- if not line:
- continue
- logger.log(VERBOSE_LOG_LEVEL, line)
- # if streamdetails contenttype is unknown, try parse it from the ffmpeg log output
- # this has no actual usecase, other than displaying the correct codec in the UI
- if (
- streamdetails.audio_format.content_type == ContentType.UNKNOWN
- and line.startswith("Stream #0:0: Audio: ")
- ):
- streamdetails.audio_format.content_type = ContentType.try_parse(
- line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
- )
- if stderr_data or "loudnorm" in line:
- stderr_data += line
- elif "HTTP error" in line:
- logger.warning(line)
- del line
-
- # if we reach this point, the process is finished (finish or aborted)
- if ffmpeg_proc.returncode == 0:
- await state_data["finished"].wait()
- finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set()
- bytes_sent = state_data["bytes_sent"]
- seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
- streamdetails.seconds_streamed = seconds_streamed
- state_str = "finished" if finished else "aborted"
- logger.debug(
- "stream %s for: %s (%s seconds streamed, exitcode %s)",
- state_str,
- streamdetails.uri,
- seconds_streamed,
- ffmpeg_proc.returncode,
- )
- # store accurate duration
- if finished:
- streamdetails.duration = streamdetails.seek_position + seconds_streamed
-
- # parse loudnorm data if we have that collected
- if stderr_data and (loudness_details := parse_loudnorm(stderr_data)):
- required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
- if finished or (seconds_streamed >= required_seconds):
- logger.debug(
- "Loudness measurement for %s: %s", streamdetails.uri, loudness_details
- )
- streamdetails.loudness = loudness_details
- await self.mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness_details
- )
+ async def log_reader():
+ # To prevent stderr locking up, we must keep reading it
+ stderr_data = ""
+ async for line in ffmpeg_proc.iter_stderr():
+ if "error" in line or "warning" in line:
+ logger.warning(line)
+ elif "critical" in line:
+ logger.critical(line)
+ elif (
+ streamdetails.audio_format.content_type == ContentType.UNKNOWN
+ and line.startswith("Stream #0:0: Audio: ")
+ ):
+ # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
+ streamdetails.audio_format.content_type = ContentType.try_parse(
+ line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
+ )
+ elif stderr_data or "loudnorm" in line:
+ stderr_data += line
+ else:
+ logger.debug(line)
+ del line
+
+ # if we reach this point, the process is finished (completed or aborted)
+ if ffmpeg_proc.returncode == 0:
+ await state_data["finished"].wait()
+ finished = state_data["finished"].is_set()
+ bytes_sent = state_data["bytes_sent"]
+ seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+ streamdetails.seconds_streamed = seconds_streamed
+ state_str = "finished" if finished else "aborted"
+ logger.debug(
+ "stream %s for: %s (%s seconds streamed, exitcode %s)",
+ state_str,
+ streamdetails.uri,
+ seconds_streamed,
+ ffmpeg_proc.returncode,
+ )
+ # store accurate duration
+ if finished and not streamdetails.seek_position:
+ streamdetails.duration = seconds_streamed
+
+ # parse loudnorm data if we have that collected
+ if stderr_data and (loudness_details := parse_loudnorm(stderr_data)):
+ required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+ if finished or (seconds_streamed >= required_seconds):
+ logger.debug(
+ "Loudness measurement for %s: %s", streamdetails.uri, loudness_details
+ )
+ streamdetails.loudness = loudness_details
+ await self.mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness_details
+ )
- # report playback
- if finished or seconds_streamed > 30:
- self.mass.create_task(
- self.mass.music.mark_item_played(
- streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+ # report playback
+ # TODO: Move this to the queue controller ?
+ if finished or seconds_streamed > 30:
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+ )
)
- )
- if music_prov := self.mass.get_provider(streamdetails.provider):
- self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
- # cleanup
- del stderr_data
-
- audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
- streamdetails,
- seek_position=streamdetails.seek_position,
- )
- async with AsyncProcess(
- ffmpeg_args, stdin=audio_source, stdout=True, stderr=True, name="ffmpeg_media_stream"
- ) as ffmpeg_proc:
- state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
- logger.debug("start media stream for: %s", streamdetails.uri)
+ if music_prov := self.mass.get_provider(streamdetails.provider):
+ self.mass.create_task(
+ music_prov.on_streamed(streamdetails, seconds_streamed)
+ )
+ # cleanup
+ del stderr_data
- self.mass.create_task(log_reader(ffmpeg_proc, state_data))
+ self.mass.create_task(log_reader())
# get pcm chunks from stdout
# we always stay buffer_size of bytes behind
# buffer is not full enough, move on
continue
- if strip_silence_begin and chunk_num == 2:
- # first 2 chunks received, strip silence of beginning
+ if strip_silence_begin and chunk_num == 3:
+ # first 3 chunks received, strip silence of beginning
stripped_audio = await strip_silence(
self.mass,
buffer,
yield subchunk
del subchunk
- # if we did not receive any data, something went (terribly) wrong
- # raise here to prevent an (endless) loop elsewhere
- if state_data["bytes_sent"] == 0:
- raise AudioError(f"stream error on {streamdetails.uri}")
-
# all chunks received, strip silence of last part if needed and yield remaining bytes
if strip_silence_end:
final_chunk = await strip_silence(
import struct
from collections import deque
from io import BytesIO
-from time import time
from typing import TYPE_CHECKING
import aiofiles
-from aiohttp import ClientResponseError, ClientTimeout
+from aiohttp import ClientTimeout
from music_assistant.common.helpers.global_cache import (
get_global_cache_value,
# pylint:disable=consider-using-f-string,too-many-locals,too-many-statements
# ruff: noqa: PLR0915
-VLC_HEADERS = {"User-Agent": "VLC/3.0.2.LibVLC/3.0.2"}
-VLC_HEADERS_ICY = {**VLC_HEADERS, "Icy-MetaData": "1"}
+HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
+HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
+
+
+class FFMpeg(AsyncProcess):
+ """FFMpeg wrapped as AsyncProcess."""
+
+ def __init__(
+ self,
+ audio_input: AsyncGenerator[bytes, None] | str | int,
+ input_format: AudioFormat,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
+ extra_args: list[str] | None = None,
+ extra_input_args: list[str] | None = None,
+ name: str = "ffmpeg",
+ stderr_enabled: bool = False,
+ audio_output: str | int = "-",
+ ) -> None:
+ """Initialize AsyncProcess."""
+ ffmpeg_args = get_ffmpeg_args(
+ input_format=input_format,
+ output_format=output_format,
+ filter_params=filter_params or [],
+ extra_args=extra_args or [],
+ input_path=audio_input if isinstance(audio_input, str) else "-",
+ output_path=audio_output if isinstance(audio_output, str) else "-",
+ extra_input_args=extra_input_args or [],
+ loglevel="info"
+ if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
+ else "error",
+ )
+ super().__init__(
+ ffmpeg_args,
+ stdin=True if isinstance(audio_input, str) else audio_input,
+ stdout=True if isinstance(audio_output, str) else audio_output,
+ stderr=stderr_enabled,
+ name=name,
+ )
async def crossfade_pcm_parts(
Do not try to request streamdetails in advance as this is expiring data.
param media_item: The QueueItem for which to request the streamdetails for.
"""
- if queue_item.streamdetails and (time() + 60) < queue_item.streamdetails.expires:
+ if queue_item.streamdetails and seek_position:
LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}")
- # we already have (fresh) streamdetails stored on the queueitem, use these.
- # this happens for example while seeking in a track.
+ # we already have (fresh?) streamdetails stored on the queueitem, use these.
+ # only do this when we're seeking.
# we create a copy (using to/from dict) to ensure the one-time values are cleared
streamdetails = StreamDetails.from_dict(queue_item.streamdetails.to_dict())
else:
if not music_prov:
LOGGER.debug(f"Skipping {prov_media} - provider not available")
continue # provider not available ?
- # prefer cache
- item_key = f"{music_prov.lookup_key}/{prov_media.item_id}"
- cache_key = f"cached_streamdetails_{item_key}"
- if cache := await mass.cache.get(cache_key):
- if time() + 60 < cache["expires"]:
- LOGGER.debug(f"Using cached streamdetails for {item_key}")
- streamdetails = StreamDetails.from_dict(cache)
# get streamdetails from provider
try:
streamdetails: StreamDetails = await music_prov.get_stream_details(
except MusicAssistantError as err:
LOGGER.warning(str(err))
else:
- # store streamdetails in cache
- expiration = streamdetails.expires - time()
- await mass.cache.set(cache_key, streamdetails.to_dict(), expiration=expiration - 60)
break
else:
raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
return file.getvalue()
-async def resolve_radio_stream(
- mass: MusicAssistant, url: str, use_get: bool = False
-) -> tuple[str, bool, bool]:
+async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]:
"""
Resolve a streaming radio URL.
resolved_url = url
timeout = ClientTimeout(total=0, connect=10, sock_read=5)
try:
- method = "GET" if use_get else "HEAD"
- async with mass.http_session.request(
- method, url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout
+ async with mass.http_session.get(
+ url, headers=HTTP_HEADERS_ICY, allow_redirects=True, timeout=timeout
) as resp:
resolved_url = str(resp.real_url)
headers = resp.headers
resp.raise_for_status()
if not resp.headers:
raise InvalidDataError("no headers found")
- supports_icy = headers.get("icy-name") is not None or "Icecast" in headers.get("server", "")
+ supports_icy = headers.get("icy-metaint") is not None
is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
if (
base_url.endswith((".m3u", ".m3u8", ".pls"))
except IsHLSPlaylist:
is_hls = True
- except (ClientResponseError, InvalidDataError) as err:
- if not use_get:
- return await resolve_radio_stream(mass, resolved_url, True)
+ except Exception as err:
LOGGER.warning("Error while parsing radio URL %s: %s", url, err)
return (resolved_url, supports_icy, is_hls)
return result
-async def get_radio_stream(
- mass: MusicAssistant, url: str, streamdetails: StreamDetails
-) -> AsyncGenerator[bytes, None]:
- """Get radio audio stream from HTTP, including metadata retrieval."""
- resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url)
- # handle special HLS stream
- if is_hls:
- async for chunk in get_hls_stream(mass, resolved_url, streamdetails):
- yield chunk
- return
- # handle http stream supports icy metadata
- if supports_icy:
- async for chunk in get_icy_stream(mass, resolved_url, streamdetails):
- yield chunk
- return
- # generic http stream (without icy metadata)
- async for chunk in get_http_stream(mass, resolved_url, streamdetails):
- yield chunk
-
-
async def get_icy_stream(
mass: MusicAssistant, url: str, streamdetails: StreamDetails
) -> AsyncGenerator[bytes, None]:
"""Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
- async with mass.http_session.get(url, headers=VLC_HEADERS_ICY, timeout=timeout) as resp:
+ async with mass.http_session.get(url, headers=HTTP_HEADERS_ICY, timeout=timeout) as resp:
headers = resp.headers
meta_int = int(headers["icy-metaint"])
while True:
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
# fetch master playlist and select (best) child playlist
# https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
- async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
+ async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp:
charset = resp.charset or "utf-8"
master_m3u_data = await resp.text(charset)
substreams = parse_m3u(master_m3u_data)
has_id3_metadata: bool | None = None
while True:
async with mass.http_session.get(
- substream_url, headers=VLC_HEADERS, timeout=timeout
+ substream_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
charset = resp.charset or "utf-8"
substream_m3u_data = await resp.text(charset)
chunk_item_url = base_path + "/" + chunk_item.path
# handle (optional) in-playlist (timed) metadata
if has_playlist_metadata is None:
- has_playlist_metadata = chunk_item.title is not None
+ has_playlist_metadata = chunk_item.title not in (None, "")
logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
if has_playlist_metadata and chunk_item.title != "no desc":
# bbc (and maybe others?) set the title to 'no desc'
# prevent that we play this chunk again if we loop through
prev_chunks.append(chunk_item.path)
async with mass.http_session.get(
- chunk_item_url, headers=VLC_HEADERS, timeout=timeout
+ chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
async for chunk in resp.content.iter_any():
yield chunk
# try to get filesize with a head request
seek_supported = streamdetails.can_seek
if seek_position or not streamdetails.size:
- async with mass.http_session.head(url, headers=VLC_HEADERS) as resp:
+ async with mass.http_session.head(url, headers=HTTP_HEADERS) as resp:
resp.raise_for_status()
if size := resp.headers.get("Content-Length"):
streamdetails.size = int(size)
seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
# headers
- headers = {**VLC_HEADERS}
+ headers = {**HTTP_HEADERS}
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
skip_bytes = 0
if seek_position and streamdetails.size:
ContentType.M4B,
)
):
- LOGGER.debug(
- "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.",
+ LOGGER.warning(
+ "Seeking in %s (%s) not possible.",
streamdetails.uri,
streamdetails.audio_format.output_format_str,
)
- async for chunk in get_ffmpeg_stream(
- url,
- # we must set the input content type to unknown to
- # enforce ffmpeg to determine it from the headers
- input_format=AudioFormat(content_type=ContentType.UNKNOWN),
- # enforce wav as we dont want to re-encode lossy formats
- # choose wav so we have descriptive headers and move on
- output_format=AudioFormat(content_type=ContentType.WAV),
- extra_input_args=["-ss", str(seek_position)],
- ):
- yield chunk
- return
+ seek_position = 0
+ streamdetails.seek_position = 0
# start the streaming from http
bytes_received = 0
ContentType.MP4,
)
):
- LOGGER.debug(
- "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.",
+ LOGGER.warning(
+ "Seeking in %s (%s) not possible.",
streamdetails.uri,
streamdetails.audio_format.output_format_str,
)
- async for chunk in get_ffmpeg_stream(
- filename,
- # we must set the input content type to unknown to
- # enforce ffmpeg to determine it from the headers
- input_format=AudioFormat(content_type=ContentType.UNKNOWN),
- # enforce wav as we dont want to re-encode lossy formats
- # choose wav so we have descriptive headers and move on
- output_format=AudioFormat(content_type=ContentType.WAV),
- extra_input_args=["-ss", str(seek_position)],
- ):
- yield chunk
- return
+ seek_position = 0
+ streamdetails.seek_position = 0
chunk_size = get_chunksize(streamdetails.audio_format)
async with aiofiles.open(streamdetails.data, "rb") as _file:
filter_params: list[str] | None = None,
extra_args: list[str] | None = None,
chunk_size: int | None = None,
- ffmpeg_loglevel: str = "info",
extra_input_args: list[str] | None = None,
- logger: logging.Logger | None = None,
+ name: str = "ffmpeg",
) -> AsyncGenerator[bytes, None]:
"""
Get the ffmpeg audio stream as async generator.
Takes care of resampling and/or recoding if needed,
according to player preferences.
"""
- ffmpeg_args = get_ffmpeg_args(
+ async with FFMpeg(
+ audio_input=audio_input,
input_format=input_format,
output_format=output_format,
- filter_params=filter_params or [],
- extra_args=extra_args or [],
- input_path=audio_input if isinstance(audio_input, str) else "-",
- output_path="-",
- loglevel=ffmpeg_loglevel,
- extra_input_args=extra_input_args or [],
- )
- stdin = audio_input if not isinstance(audio_input, str) else True
- async with AsyncProcess(
- ffmpeg_args,
- stdin=stdin,
- stdout=True,
- stderr=logger or LOGGER.getChild("ffmpeg_stream"),
- name="ffmpeg_stream",
+ filter_params=filter_params,
+ extra_args=extra_args,
+ extra_input_args=extra_input_args,
+ name=name,
) as ffmpeg_proc:
# read final chunks from stdout
- chunk_size = chunk_size or get_chunksize(output_format, 1)
- async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+ iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
+ async for chunk in iterator:
yield chunk
extra_args: list[str] | None = None,
input_path: str = "-",
output_path: str = "-",
- loglevel: str = "info",
extra_input_args: list[str] | None = None,
+ loglevel: str = "error",
) -> list[str]:
"""Collect all args to send to the ffmpeg process."""
if extra_args is None:
"-hide_banner",
"-loglevel",
loglevel,
+ "-nostats",
"-ignore_unknown",
"-protocol_whitelist",
"file,http,https,tcp,tls,crypto,pipe,data,fd",
from types import TracebackType
from typing import TYPE_CHECKING
-from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
+from music_assistant.constants import MASS_LOGGER_NAME
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
args: list[str],
stdin: bool | int | AsyncGenerator[bytes, None] | None = None,
stdout: bool | int | None = None,
- stderr: bool | int | logging.Logger | None = None,
+ stderr: bool | int | None = False,
name: str | None = None,
) -> None:
"""Initialize AsyncProcess."""
self.proc: asyncio.subprocess.Process | None = None
- if isinstance(stderr, logging.Logger):
- self._stderr_logger = stderr
- stderr = asyncio.subprocess.PIPE
- else:
- self._stderr_logger = None
+ if name is None:
+ name = args[0].split(os.sep)[-1]
+ self.name = name
+ self.attached_tasks: list[asyncio.Task] = []
+ self.logger = LOGGER.getChild(name)
self._args = args
self._stdin = stdin
self._stdout = stdout
self._stderr_enabled = stderr not in (None, False)
self._close_called = False
self._returncode: bool | None = None
- if name is None:
- name = self._args[0].split(os.sep)[-1]
- self.name = name
- self.attached_tasks: list[asyncio.Task] = []
@property
def closed(self) -> bool:
async def start(self) -> None:
"""Perform Async init of process."""
- if self._stdin is True or isinstance(self._stdin, AsyncGenerator):
- stdin = asyncio.subprocess.PIPE
- else:
- stdin = self._stdin
- if self._stdout is True or isinstance(self._stdout, AsyncGenerator):
- stdout = asyncio.subprocess.PIPE
- else:
- stdout = self._stdout
- if self._stderr is True or isinstance(self._stderr, AsyncGenerator):
- stderr = asyncio.subprocess.PIPE
- else:
- stderr = self._stderr
self.proc = await asyncio.create_subprocess_exec(
*self._args,
- stdin=stdin if self._stdin_enabled else None,
- stdout=stdout if self._stdout_enabled else None,
- stderr=stderr if self._stderr_enabled else None,
+ stdin=asyncio.subprocess.PIPE
+ if (self._stdin is True or isinstance(self._stdin, AsyncGenerator))
+ else self._stdin,
+ stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
+ stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
)
- LOGGER.debug("Process %s started with PID %s", self.name, self.proc.pid)
- if not isinstance(self._stdin, int | None):
+ self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid)
+ if isinstance(self._stdin, AsyncGenerator):
self.attached_tasks.append(asyncio.create_task(self._feed_stdin()))
- if self._stderr_logger:
- self.attached_tasks.append(asyncio.create_task(self._read_stderr()))
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
if self.returncode is not None:
break
except TimeoutError:
- LOGGER.debug(
+ self.logger.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
self.name,
self.proc.pid,
)
self.proc.terminate()
- LOGGER.debug(
+ self.logger.debug(
"Process %s with PID %s stopped with returncode %s",
self.name,
self.proc.pid,
self._returncode = self.proc.returncode
return (stdout, stderr)
- async def iter_stderr(self) -> AsyncGenerator[bytes, None]:
- """Iterate lines from the stderr stream."""
- while self.returncode is None:
- try:
- line = await self.proc.stderr.readline()
- if line == b"":
- break
- yield line
- except ValueError as err:
- # we're waiting for a line (separator found), but the line was too big
- # this may happen with ffmpeg during a long (radio) stream where progress
- # gets outputted to the stderr but no newline
- # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
- # NOTE: this consumes the line that was too big
- if "chunk exceed the limit" in str(err):
- continue
- # raise for all other (value) errors
- raise
-
async def _feed_stdin(self) -> None:
"""Feed stdin with chunks from an AsyncGenerator."""
if TYPE_CHECKING:
return
await self.write(chunk)
await self.write_eof()
- except asyncio.CancelledError:
+ except Exception as err:
+ if not isinstance(err, asyncio.CancelledError):
+ self.logger.exception(err)
# make sure the stdin generator is also properly closed
# by propagating a cancellederror within
task = asyncio.create_task(self._stdin.__anext__())
task.cancel()
- async def _read_stderr(self) -> None:
- """Read stderr and log to logger."""
- async for line in self.iter_stderr():
- line = line.decode().strip() # noqa: PLW2901
+ async def read_stderr(self) -> bytes:
+ """Read line from stderr."""
+ try:
+ return await self.proc.stderr.readline()
+ except ValueError as err:
+ # we're waiting for a line (separator found), but the line was too big
+ # this may happen with ffmpeg during a long (radio) stream where progress
+ # gets outputted to the stderr but no newline
+ # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
+ # NOTE: this consumes the line that was too big
+ if "chunk exceed the limit" in str(err):
+ return await self.proc.stderr.readline()
+ # raise for all other (value) errors
+ raise
+
+ async def iter_stderr(self) -> AsyncGenerator[str, None]:
+ """Iterate lines from the stderr stream as string."""
+ while True:
+ line = await self.read_stderr()
+ if line == b"":
+ break
+ line = line.decode().strip()
if not line:
continue
- if "error" in line.lower():
- self._stderr_logger.error(line)
- elif "warning" in line.lower():
- self._stderr_logger.warning(line)
- else:
- self._stderr_logger.log(VERBOSE_LOG_LEVEL, line)
+ yield line
async def check_output(args: str | list[str]) -> tuple[int, bytes]:
async def get_audio_stream( # type: ignore[return]
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
+ """
+ Return the (custom) audio stream for the provider item.
+
+ Will only be called when the stream_type is set to CUSTOM.
+ """
raise NotImplementedError
async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
input_format=self.input_format,
output_format=AIRPLAY_PCM_FORMAT,
filter_params=get_player_filter_params(self.mass, player_id),
- loglevel="fatal",
)
self._ffmpeg_proc = AsyncProcess(
ffmpeg_args,
ImageType,
MediaType,
ProviderFeature,
+ StreamType,
)
from music_assistant.common.models.errors import LoginFailed
from music_assistant.common.models.media_items import (
audio_format=AudioFormat(
content_type=ContentType.try_parse(url_details["format"].split("_")[0])
),
+ stream_type=StreamType.CUSTOM,
duration=int(song_data["DURATION"]),
data={"url": url, "format": url_details["format"]},
- expires=url_details["exp"],
size=int(song_data[f"FILESIZE_{url_details['format']}"]),
)
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import SetupFailedError
-from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_PATH
-from music_assistant.server.helpers.audio import get_file_stream
from .base import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
abs_path = get_absolute_path(self.base_path, file_path)
return await exists(abs_path)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- abs_path = get_absolute_path(self.base_path, streamdetails.item_id)
- async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position):
- yield chunk
-
async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
"""Yield (binary) contents of file in chunks of bytes."""
abs_path = get_absolute_path(self.base_path, file_path)
ConfigEntryType,
ConfigValueOption,
)
-from music_assistant.common.models.enums import ExternalID, ProviderFeature
+from music_assistant.common.models.enums import ExternalID, ProviderFeature, StreamType
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
item_id=item_id,
audio_format=prov_mapping.audio_format,
media_type=MediaType.TRACK,
+ stream_type=StreamType.LOCAL_FILE if file_item.local_path else StreamType.CUSTOM,
duration=library_item.duration,
size=file_item.file_size,
- data=file_item.local_path,
+ data=file_item,
+ path=file_item.local_path,
can_seek=prov_mapping.audio_format.content_type in SEEKABLE_FILES,
)
ImageType,\r
MediaType,\r
ProviderFeature,\r
+ StreamType,\r
)\r
from music_assistant.common.models.errors import (\r
InvalidDataError,\r
from music_assistant.common.models.media_items import Playlist as JellyfinPlaylist\r
from music_assistant.common.models.media_items import Track as JellyfinTrack\r
from music_assistant.common.models.streamdetails import StreamDetails\r
-from music_assistant.server.helpers.audio import get_http_stream\r
\r
if TYPE_CHECKING:\r
from music_assistant.common.models.provider import ProviderManifest\r
jellyfin_track = API.get_item(self._jellyfin_server.jellyfin, item_id)\r
mimetype = self._media_mime_type(jellyfin_track)\r
media_stream = jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0]\r
+ url = API.audio_url(\r
+ self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID], SUPPORTED_CONTAINER_FORMATS\r
+ )\r
if ITEM_KEY_MEDIA_CODEC in media_stream:\r
- media_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])\r
+ content_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])\r
else:\r
- media_type = ContentType.try_parse(mimetype)\r
+ content_type = ContentType.try_parse(mimetype)\r
return StreamDetails(\r
item_id=jellyfin_track[ITEM_KEY_ID],\r
provider=self.instance_id,\r
audio_format=AudioFormat(\r
- content_type=media_type,\r
+ content_type=content_type,\r
channels=jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CHANNELS],\r
),\r
+ stream_type=StreamType.HTTP,\r
duration=int(\r
jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000\r
), # 10000000 ticks per millisecond)\r
- data=jellyfin_track,\r
+ path=url,\r
)\r
\r
def _get_thumbnail_url(self, client: JellyfinClient, media_item: dict[str, Any]) -> str | None:\r
mime_type, _ = mimetypes.guess_type(path)\r
\r
return mime_type\r
-\r
- async def get_audio_stream(\r
- self, streamdetails: StreamDetails, seek_position: int = 0\r
- ) -> AsyncGenerator[bytes, None]:\r
- """Return the audio stream for the provider item."""\r
- url = API.audio_url(\r
- self._jellyfin_server.jellyfin, streamdetails.item_id, SUPPORTED_CONTAINER_FORMATS\r
- )\r
-\r
- async for chunk in get_http_stream(self.mass, url, streamdetails, seek_position):\r
- yield chunk\r
SonicError,
)
-from music_assistant.common.models.enums import ContentType, ImageType, MediaType, ProviderFeature
+from music_assistant.common.models.enums import (
+ ContentType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
provider=self.instance_id,
can_seek=self._seek_support,
audio_format=AudioFormat(content_type=ContentType.try_parse(mime_type)),
+ stream_type=StreamType.CUSTOM,
duration=sonic_song.duration if sonic_song.duration is not None else 0,
)
ImageType,
MediaType,
ProviderFeature,
+ StreamType,
)
from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_http_stream
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.music_provider import MusicProvider
content_type=media_type,
channels=media.audioChannels,
),
+ stream_type=StreamType.HTTP,
duration=plex_track.duration,
data=plex_track,
)
if media_type != ContentType.M4A:
- stream_details.data = self._plex_server.url(media_part.key, True)
+ stream_details.path = self._plex_server.url(media_part.key, True)
if audio_stream.samplingRate:
stream_details.audio_format.sample_rate = audio_stream.samplingRate
if audio_stream.bitDepth:
else:
url = plex_track.getStreamURL()
media_info = await parse_tags(url)
-
+ stream_details.path = url
stream_details.audio_format.channels = media_info.channels
stream_details.audio_format.content_type = ContentType.try_parse(media_info.format)
stream_details.audio_format.sample_rate = media_info.sample_rate
return stream_details
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- if isinstance(streamdetails.data, str):
- url = streamdetails.data
- else:
- url = streamdetails.data.getStreamURL(offset=seek_position)
- async for chunk in get_http_stream(self.mass, url, streamdetails, 0):
- yield chunk
-
async def get_myplex_account_and_refresh_token(self, auth_token: str) -> MyPlexAccount:
"""Get a MyPlexAccount object and refresh the token if needed."""
from music_assistant.common.helpers.util import parse_title_and_version, try_parse_int
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ExternalID, ProviderFeature
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ ExternalID,
+ ProviderFeature,
+ StreamType,
+)
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
from music_assistant.server.helpers.app_vars import app_var
# pylint: enable=no-name-in-module
-from music_assistant.server.helpers.audio import get_http_stream
from music_assistant.server.models.music_provider import MusicProvider
if TYPE_CHECKING:
else:
msg = f"Unsupported mime type for {item_id}"
raise MediaNotFoundError(msg)
+ self.mass.create_task(self._report_playback_started(streamdata))
return StreamDetails(
item_id=str(item_id),
provider=self.instance_id,
sample_rate=int(streamdata["sampling_rate"] * 1000),
bit_depth=streamdata["bit_depth"],
),
+ stream_type=StreamType.HTTP,
duration=streamdata["duration"],
data=streamdata, # we need these details for reporting playback
- expires=time.time() + 300, # url expires very fast
+ path=streamdata["url"],
)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- # report playback started as soon as we start streaming
- self.mass.create_task(self._report_playback_started(streamdetails.data))
- async for chunk in get_http_stream(
- self.mass, streamdetails.data["url"], streamdetails, seek_position
- ):
- yield chunk
-
async def _report_playback_started(self, streamdata: dict) -> None:
"""Report playback start to qobuz."""
# TODO: need to figure out if the streamed track is purchased by user
from radios import FilterBy, Order, RadioBrowser, RadioBrowserError
-from music_assistant.common.models.enums import LinkType, ProviderFeature
+from music_assistant.common.models.enums import LinkType, ProviderFeature, StreamType
from music_assistant.common.models.media_items import (
AudioFormat,
BrowseFolder,
SearchResults,
)
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_radio_stream
from music_assistant.server.models.music_provider import MusicProvider
SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
content_type=ContentType.try_parse(stream.codec),
),
media_type=MediaType.RADIO,
- data=stream.url_resolved,
- expires=time() + 3600,
+ stream_type=StreamType.HTTP,
+ path=stream.url_resolved,
+ can_seek=False,
)
-
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- # report playback started as soon as we start streaming
- async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
- yield chunk
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.constants import UGP_PREFIX
-from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params
+from music_assistant.server.helpers.audio import FFMpeg, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
stream.set_callback(stream_callback)
stream_path = f"tcp://{host}:{port}"
self.logger.debug("Start streaming to %s", stream_path)
- ffmpeg_args = get_ffmpeg_args(
- input_format=input_format,
- output_format=DEFAULT_SNAPCAST_FORMAT,
- filter_params=get_player_filter_params(self.mass, player_id),
- output_path=f"tcp://{host}:{port}",
- loglevel="fatal",
- )
try:
- async with AsyncProcess(
- ffmpeg_args,
- stdin=True,
- stdout=False,
- stderr=self.logger.getChild("ffmpeg"),
+ async with FFMpeg(
+ audio_input=audio_source,
+ input_format=input_format,
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ filter_params=get_player_filter_params(self.mass, player_id),
name="snapcast_ffmpeg",
+ audio_output=f"tcp://{host}:{port}",
) as ffmpeg_proc:
- async for chunk in audio_source:
- await ffmpeg_proc.write(chunk)
- await ffmpeg_proc.write_eof()
+ await ffmpeg_proc.wait()
# we need to wait a bit for the stream status to become idle
# to ensure that all snapclients have consumed the audio
await self.mass.players.wait_for_state(player, PlayerState.IDLE)
"""Handle callback for topology change event."""
if xml := event.variables.get("zone_group_state"):
zgs = ET.fromstring(xml)
- for vanished_device in zgs.find("VanishedDevices") or []:
+ vanished_devices = zgs.find("VanishedDevices") or []
+ for vanished_device in vanished_devices:
if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS:
self.logger.debug(
"Ignoring %s marked %s as vanished with reason: %s",
from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
from music_assistant.common.models.errors import InvalidDataError, LoginFailed
from music_assistant.common.models.media_items import (
Artist,
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import (
- get_hls_stream,
- get_http_stream,
- resolve_radio_stream,
-)
from music_assistant.server.models.music_provider import MusicProvider
CONF_CLIENT_ID = "client_id"
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_format),
),
+ stream_type=StreamType.HTTP,
data=url,
)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- _, _, is_hls = await resolve_radio_stream(self.mass, streamdetails.data)
- if is_hls:
- # some soundcloud streams are HLS, prefer the radio streamer
- async for chunk in get_hls_stream(self.mass, streamdetails.data, streamdetails):
- yield chunk
- return
- # regular stream from http
- async for chunk in get_http_stream(
- self.mass, streamdetails.data, streamdetails, seek_position
- ):
- yield chunk
-
async def _parse_artist(self, artist_obj: dict) -> Artist:
"""Parse a Soundcloud user response to Artist model object."""
artist_id = None
from music_assistant.common.helpers.json import json_loads
from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ExternalID, ProviderFeature
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ ExternalID,
+ ProviderFeature,
+ StreamType,
+)
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- # make sure a valid track is requested.
- track = await self.get_track(item_id)
return StreamDetails(
- item_id=track.item_id,
+ item_id=item_id,
provider=self.instance_id,
audio_format=AudioFormat(
content_type=ContentType.OGG,
),
- duration=track.duration,
- # these streamdetails may be cached for a long time,
- # as there is no time sensitive info in them
- expires=time.time() + 30 * 24 * 3600,
+ stream_type=StreamType.CUSTOM,
)
async def get_audio_stream(
yield chunk
bytes_sent += len(chunk)
- if bytes_sent == 0 and not self._ap_workaround:
- # AP resolve failure
- # https://github.com/librespot-org/librespot/issues/972
- # retry with ap-port set to invalid value, which will force fallback
- args += ["--ap-port", "12345"]
- async with AsyncProcess(args, stdout=True) as librespot_proc:
- async for chunk in librespot_proc.iter_any():
- yield chunk
- self._ap_workaround = True
-
async def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""
artist = Artist(
ImageType,
MediaType,
ProviderFeature,
+ StreamType,
)
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_http_stream
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import AudioTags, parse_tags
from music_assistant.server.models.music_provider import MusicProvider
bit_depth=media_info.bits_per_sample,
channels=media_info.channels,
),
+ stream_type=StreamType.HTTP,
duration=track.duration,
- data=url,
+ path=url,
)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- # report playback started as soon as we start streaming
- async for chunk in get_http_stream(
- self.mass, streamdetails.data, streamdetails, seek_position
- ):
- yield chunk
-
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get artist details for given artist id."""
tidal_session = await self._get_tidal_session()
from __future__ import annotations
-from time import time
from typing import TYPE_CHECKING
from asyncio_throttle import Throttler
from music_assistant.common.helpers.util import create_sort_name
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
AudioFormat,
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_USERNAME
-from music_assistant.server.helpers.audio import get_radio_stream
from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.music_provider import MusicProvider
content_type=ContentType.UNKNOWN,
),
media_type=MediaType.RADIO,
- data=item_id,
- expires=time() + 3600,
+ stream_type=StreamType.HTTP,
+ path=item_id,
+ can_seek=False,
)
stream_item_id, media_type = item_id.split("--", 1)
stream_info = await self.__get_data("Tune.ashx", id=stream_item_id)
content_type=ContentType(stream["media_type"]),
),
media_type=MediaType.RADIO,
- data=stream["url"],
- expires=time() + 3600,
+ stream_type=StreamType.HTTP,
+ path=stream["url"],
+ can_seek=False,
)
msg = f"Unable to retrieve stream details for {item_id}"
raise MediaNotFoundError(msg)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- # report playback started as soon as we start streaming
- async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
- yield chunk
-
async def __get_data(self, endpoint: str, **kwargs):
"""Get data from api."""
if endpoint.startswith("http"):
from __future__ import annotations
-import os
from typing import TYPE_CHECKING
-from music_assistant.common.models.enums import ContentType, ImageType, MediaType
+from music_assistant.common.models.enums import ContentType, ImageType, MediaType, StreamType
from music_assistant.common.models.errors import MediaNotFoundError
from music_assistant.common.models.media_items import (
Artist,
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import (
- get_file_stream,
- get_http_stream,
- get_radio_stream,
- resolve_radio_stream,
-)
from music_assistant.server.helpers.tags import AudioTags, parse_tags
from music_assistant.server.models.music_provider import MusicProvider
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator
-
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueType,
if cached_info and not force_refresh:
return AudioTags.parse(cached_info)
# parse info with ffprobe (and store in cache)
- resolved_url, _, _ = await resolve_radio_stream(self.mass, url)
- media_info = await parse_tags(resolved_url)
+ media_info = await parse_tags(url)
if "authSig" in url:
media_info.has_cover_image = False
await self.mass.cache.set(cache_key, media_info.raw)
bit_depth=media_info.bits_per_sample,
),
media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
- data={"url": item_id},
+ stream_type=StreamType.HTTP,
+ path=item_id,
+ can_seek=not is_radio,
)
-
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- if streamdetails.media_type == MediaType.RADIO:
- # radio stream url
- async for chunk in get_radio_stream(
- self.mass, streamdetails.data["url"], streamdetails
- ):
- yield chunk
- elif os.path.isfile(streamdetails.data):
- # local file
- async for chunk in get_file_stream(
- self.mass, streamdetails.data["url"], streamdetails, seek_position
- ):
- yield chunk
- else:
- # regular stream url (without icy meta)
- async for chunk in get_http_stream(
- self.mass, streamdetails.data["url"], streamdetails, seek_position
- ):
- yield chunk
from urllib.parse import unquote
import pytube
-from aiohttp import ClientResponseError
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
from music_assistant.common.models.errors import (
InvalidDataError,
LoginFailed,
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_http_stream
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.models.music_provider import MusicProvider
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_format["mimeType"]),
),
- data=url,
+ stream_type=StreamType.HTTP,
+ path=url,
)
- if (
- track_obj["streamingData"].get("expiresInSeconds")
- and track_obj["streamingData"].get("expiresInSeconds").isdigit()
- ):
- stream_details.expires = time() + int(
- track_obj["streamingData"].get("expiresInSeconds")
- )
- else:
- stream_details.expires = time() + 600
if stream_format.get("audioChannels") and str(stream_format.get("audioChannels")).isdigit():
stream_details.audio_format.channels = int(stream_format.get("audioChannels"))
if stream_format.get("audioSampleRate") and stream_format.get("audioSampleRate").isdigit():
stream_details.audio_format.sample_rate = int(stream_format.get("audioSampleRate"))
return stream_details
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- is_retry = False
- while True:
- try:
- async for chunk in get_http_stream(
- self.mass, streamdetails.data, streamdetails, seek_position
- ):
- yield chunk
- return
- except ClientResponseError as err:
- if not is_retry and err.status == 403:
- # cipher expired, get a fresh one
- self.logger.warning("Cipher expired, trying to refresh...")
- streamdetails = await self.get_stream_details(streamdetails.item_id)
- continue
- # raise for all other cases or we have already retried
- raise
-
async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs):
"""Post data to the given endpoint."""
await self._check_oauth_token()
"AUDIO_QUALITY_MEDIUM": 2,
"AUDIO_QUALITY_HIGH": 3,
}
+ if "streamingData" not in track_obj:
+ raise MediaNotFoundError("No stream found for this track")
for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]:
if adaptive_format["mimeType"].startswith("audio") and (
not stream_format
):
stream_format = adaptive_format
if stream_format is None:
- msg = "No stream found for this track"
- raise MediaNotFoundError(msg)
+ raise MediaNotFoundError("No stream found for this track")
return stream_format