From 44d9f952fb4e6fb13dbd39375c45072ac9b44822 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 1 Apr 2024 17:21:08 +0200 Subject: [PATCH] Fix issues with streaming YTM and some radio stations (#1193) --- music_assistant/common/models/enums.py | 8 + .../common/models/streamdetails.py | 9 +- music_assistant/server/controllers/players.py | 3 + music_assistant/server/controllers/streams.py | 201 +++++++++--------- music_assistant/server/helpers/audio.py | 176 +++++++-------- music_assistant/server/helpers/process.py | 104 ++++----- .../server/models/music_provider.py | 6 +- .../server/providers/airplay/__init__.py | 1 - .../server/providers/deezer/__init__.py | 3 +- .../providers/filesystem_local/__init__.py | 10 - .../server/providers/filesystem_local/base.py | 6 +- .../server/providers/jellyfin/__init__.py | 25 +-- .../providers/opensubsonic/sonic_provider.py | 9 +- .../server/providers/plex/__init__.py | 18 +- .../server/providers/qobuz/__init__.py | 23 +- .../server/providers/radiobrowser/__init__.py | 16 +- .../server/providers/snapcast/__init__.py | 24 +-- .../server/providers/sonos/player.py | 3 +- .../server/providers/soundcloud/__init__.py | 24 +-- .../server/providers/spotify/__init__.py | 26 +-- .../server/providers/tidal/__init__.py | 15 +- .../server/providers/tunein/__init__.py | 22 +- .../server/providers/url/__init__.py | 41 +--- .../server/providers/ytmusic/__init__.py | 42 +--- 24 files changed, 324 insertions(+), 491 deletions(-) diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index 2c696125..75a2d53a 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -397,3 +397,11 @@ class ConfigEntryType(StrEnum): def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 """Set default enum member if an unknown value is provided.""" return cls.UNKNOWN + + +class StreamType(StrEnum): + """Enum for the type of streamdetails.""" + + HTTP = "http" + LOCAL_FILE = "local_file" + CUSTOM = "custom" diff --git a/music_assistant/common/models/streamdetails.py b/music_assistant/common/models/streamdetails.py index b87a5db7..230c5acf 100644 --- a/music_assistant/common/models/streamdetails.py +++ b/music_assistant/common/models/streamdetails.py @@ -3,12 +3,11 @@ from __future__ import annotations from dataclasses import dataclass -from time import time from typing import Any from mashumaro import DataClassDictMixin -from music_assistant.common.models.enums import MediaType +from music_assistant.common.models.enums import MediaType, StreamType from music_assistant.common.models.media_items import AudioFormat @@ -36,6 +35,8 @@ class StreamDetails(DataClassDictMixin): item_id: str audio_format: AudioFormat media_type: MediaType = MediaType.TRACK + stream_type: StreamType = StreamType.CUSTOM + path: str | None = None # stream_title: radio streams can optionally set this field stream_title: str | None = None @@ -43,14 +44,14 @@ class StreamDetails(DataClassDictMixin): duration: int | None = None # total size in bytes of the item, calculated at eof when omitted size: int | None = None - # expires: timestamp this streamdetails expire - expires: float = time() + 3600 # data: provider specific data (not exposed externally) # this info is for example used to pass details to the get_audio_stream data: Any = None # can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item can_seek: bool = True + # stream_type: + # the fields below will be set/controlled by the streamcontroller seek_position: int = 0 fade_in: bool = False diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index b407e2f6..52a09eea 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -28,6 +28,7 @@ from music_assistant.common.models.enums import ( PlayerType, ProviderFeature, ProviderType, + StreamType, ) from music_assistant.common.models.errors import ( AlreadyRegisteredError, @@ -1183,8 +1184,10 @@ class PlayerController(CoreController): audio_format=AudioFormat( content_type=ContentType.try_parse(url), ), + stream_type=StreamType.HTTP, media_type=MediaType.ANNOUNCEMENT, data={"url": url, "use_pre_announce": use_pre_announce}, + path=url, target_loudness=-10, ), ) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 826d5fd1..1516250a 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -15,7 +15,7 @@ import time import urllib.parse from collections.abc import AsyncGenerator from contextlib import suppress -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import shortuuid from aiofiles.os import wrap @@ -27,8 +27,8 @@ from music_assistant.common.models.config_entries import ( ConfigValueOption, ConfigValueType, ) -from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType -from music_assistant.common.models.errors import AudioError, QueueEmpty +from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType, StreamType +from music_assistant.common.models.errors import QueueEmpty from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import ( @@ -41,19 +41,20 @@ from music_assistant.constants import ( CONF_PUBLISH_IP, SILENCE_FILE, UGP_PREFIX, - VERBOSE_LOG_LEVEL, ) from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( + FFMpeg, check_audio_support, crossfade_pcm_parts, - get_ffmpeg_args, get_ffmpeg_stream, + get_hls_stream, + get_icy_stream, get_player_filter_params, parse_loudnorm, + resolve_radio_stream, strip_silence, ) -from music_assistant.server.helpers.process import AsyncProcess from music_assistant.server.helpers.util import get_ips from music_assistant.server.helpers.webserver import Webserver from music_assistant.server.models.core_controller import CoreController @@ -985,8 +986,8 @@ class StreamsController(CoreController): strip_silence_end = False # pcm_sample_size = chunk size = 1 second of pcm audio pcm_sample_size = pcm_format.pcm_sample_size - buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size - buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size + buffer_size_begin = pcm_sample_size * 3 if strip_silence_begin else pcm_sample_size * 2 + buffer_size_end = pcm_sample_size * 6 if strip_silence_end else pcm_sample_size * 2 # collect all arguments for ffmpeg filter_params = [] @@ -1003,95 +1004,110 @@ class StreamsController(CoreController): if streamdetails.fade_in: filter_params.append("afade=type=in:start_time=0:duration=3") - ffmpeg_args = get_ffmpeg_args( + if streamdetails.stream_type == StreamType.CUSTOM: + audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( + streamdetails, + seek_position=streamdetails.seek_position, + ) + elif streamdetails.media_type == MediaType.RADIO: + resolved_url, supports_icy, is_hls = await resolve_radio_stream( + self.mass, streamdetails.path + ) + if supports_icy: + audio_source = get_icy_stream(self.mass, resolved_url, streamdetails) + elif is_hls: + audio_source = get_hls_stream(self.mass, resolved_url, streamdetails) + else: + audio_source = resolved_url + else: + audio_source = streamdetails.path + extra_input_args = [] + if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM: + extra_input_args += ["-ss", str(int(streamdetails.seek_position))] + logger.debug("start media stream for: %s", streamdetails.uri) + state_data = {"finished": asyncio.Event(), "bytes_sent": 0} + + async with FFMpeg( + audio_input=audio_source, input_format=streamdetails.audio_format, output_format=pcm_format, filter_params=filter_params, - input_path="-", - # loglevel info is needed for loudness measurement - loglevel="info", # we criple ffmpeg a bit on purpose with the filter_threads # option so it doesn't consume all cpu when calculating loudnorm - extra_input_args=["-filter_threads", "1"], - ) + extra_input_args=[*extra_input_args, "-filter_threads", "1"], + name="ffmpeg_media_stream", + stderr_enabled=True, + ) as ffmpeg_proc: - async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]): - # To prevent stderr locking up, we must keep reading it - stderr_data = "" - async for line in ffmpeg_proc.iter_stderr(): - line = line.decode().strip() # noqa: PLW2901 - if not line: - continue - logger.log(VERBOSE_LOG_LEVEL, line) - # if streamdetails contenttype is unknown, try parse it from the ffmpeg log output - # this has no actual usecase, other than displaying the correct codec in the UI - if ( - streamdetails.audio_format.content_type == ContentType.UNKNOWN - and line.startswith("Stream #0:0: Audio: ") - ): - streamdetails.audio_format.content_type = ContentType.try_parse( - line.split("Stream #0:0: Audio: ")[1].split(" ")[0] - ) - if stderr_data or "loudnorm" in line: - stderr_data += line - elif "HTTP error" in line: - logger.warning(line) - del line - - # if we reach this point, the process is finished (finish or aborted) - if ffmpeg_proc.returncode == 0: - await state_data["finished"].wait() - finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set() - bytes_sent = state_data["bytes_sent"] - seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 - streamdetails.seconds_streamed = seconds_streamed - state_str = "finished" if finished else "aborted" - logger.debug( - "stream %s for: %s (%s seconds streamed, exitcode %s)", - state_str, - streamdetails.uri, - seconds_streamed, - ffmpeg_proc.returncode, - ) - # store accurate duration - if finished: - streamdetails.duration = streamdetails.seek_position + seconds_streamed - - # parse loudnorm data if we have that collected - if stderr_data and (loudness_details := parse_loudnorm(stderr_data)): - required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 - if finished or (seconds_streamed >= required_seconds): - logger.debug( - "Loudness measurement for %s: %s", streamdetails.uri, loudness_details - ) - streamdetails.loudness = loudness_details - await self.mass.music.set_track_loudness( - streamdetails.item_id, streamdetails.provider, loudness_details - ) + async def log_reader(): + # To prevent stderr locking up, we must keep reading it + stderr_data = "" + async for line in ffmpeg_proc.iter_stderr(): + if "error" in line or "warning" in line: + logger.warning(line) + elif "critical" in line: + logger.critical(line) + elif ( + streamdetails.audio_format.content_type == ContentType.UNKNOWN + and line.startswith("Stream #0:0: Audio: ") + ): + # if streamdetails contenttype is unknown, try parse it from the ffmpeg log + streamdetails.audio_format.content_type = ContentType.try_parse( + line.split("Stream #0:0: Audio: ")[1].split(" ")[0] + ) + elif stderr_data or "loudnorm" in line: + stderr_data += line + else: + logger.debug(line) + del line + + # if we reach this point, the process is finished (completed or aborted) + if ffmpeg_proc.returncode == 0: + await state_data["finished"].wait() + finished = state_data["finished"].is_set() + bytes_sent = state_data["bytes_sent"] + seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 + streamdetails.seconds_streamed = seconds_streamed + state_str = "finished" if finished else "aborted" + logger.debug( + "stream %s for: %s (%s seconds streamed, exitcode %s)", + state_str, + streamdetails.uri, + seconds_streamed, + ffmpeg_proc.returncode, + ) + # store accurate duration + if finished and not streamdetails.seek_position: + streamdetails.duration = seconds_streamed + + # parse loudnorm data if we have that collected + if stderr_data and (loudness_details := parse_loudnorm(stderr_data)): + required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 + if finished or (seconds_streamed >= required_seconds): + logger.debug( + "Loudness measurement for %s: %s", streamdetails.uri, loudness_details + ) + streamdetails.loudness = loudness_details + await self.mass.music.set_track_loudness( + streamdetails.item_id, streamdetails.provider, loudness_details + ) - # report playback - if finished or seconds_streamed > 30: - self.mass.create_task( - self.mass.music.mark_item_played( - streamdetails.media_type, streamdetails.item_id, streamdetails.provider + # report playback + # TODO: Move this to the queue controller ? + if finished or seconds_streamed > 30: + self.mass.create_task( + self.mass.music.mark_item_played( + streamdetails.media_type, streamdetails.item_id, streamdetails.provider + ) ) - ) - if music_prov := self.mass.get_provider(streamdetails.provider): - self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) - # cleanup - del stderr_data - - audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( - streamdetails, - seek_position=streamdetails.seek_position, - ) - async with AsyncProcess( - ffmpeg_args, stdin=audio_source, stdout=True, stderr=True, name="ffmpeg_media_stream" - ) as ffmpeg_proc: - state_data = {"finished": asyncio.Event(), "bytes_sent": 0} - logger.debug("start media stream for: %s", streamdetails.uri) + if music_prov := self.mass.get_provider(streamdetails.provider): + self.mass.create_task( + music_prov.on_streamed(streamdetails, seconds_streamed) + ) + # cleanup + del stderr_data - self.mass.create_task(log_reader(ffmpeg_proc, state_data)) + self.mass.create_task(log_reader()) # get pcm chunks from stdout # we always stay buffer_size of bytes behind @@ -1108,8 +1124,8 @@ class StreamsController(CoreController): # buffer is not full enough, move on continue - if strip_silence_begin and chunk_num == 2: - # first 2 chunks received, strip silence of beginning + if strip_silence_begin and chunk_num == 3: + # first 3 chunks received, strip silence of beginning stripped_audio = await strip_silence( self.mass, buffer, @@ -1130,11 +1146,6 @@ class StreamsController(CoreController): yield subchunk del subchunk - # if we did not receive any data, something went (terribly) wrong - # raise here to prevent an (endless) loop elsewhere - if state_data["bytes_sent"] == 0: - raise AudioError(f"stream error on {streamdetails.uri}") - # all chunks received, strip silence of last part if needed and yield remaining bytes if strip_silence_end: final_chunk = await strip_silence( diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 6c1ef7d6..c07e6095 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -9,11 +9,10 @@ import re import struct from collections import deque from io import BytesIO -from time import time from typing import TYPE_CHECKING import aiofiles -from aiohttp import ClientResponseError, ClientTimeout +from aiohttp import ClientTimeout from music_assistant.common.helpers.global_cache import ( get_global_cache_value, @@ -59,8 +58,45 @@ LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio") # pylint:disable=consider-using-f-string,too-many-locals,too-many-statements # ruff: noqa: PLR0915 -VLC_HEADERS = {"User-Agent": "VLC/3.0.2.LibVLC/3.0.2"} -VLC_HEADERS_ICY = {**VLC_HEADERS, "Icy-MetaData": "1"} +HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"} +HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"} + + +class FFMpeg(AsyncProcess): + """FFMpeg wrapped as AsyncProcess.""" + + def __init__( + self, + audio_input: AsyncGenerator[bytes, None] | str | int, + input_format: AudioFormat, + output_format: AudioFormat, + filter_params: list[str] | None = None, + extra_args: list[str] | None = None, + extra_input_args: list[str] | None = None, + name: str = "ffmpeg", + stderr_enabled: bool = False, + audio_output: str | int = "-", + ) -> None: + """Initialize AsyncProcess.""" + ffmpeg_args = get_ffmpeg_args( + input_format=input_format, + output_format=output_format, + filter_params=filter_params or [], + extra_args=extra_args or [], + input_path=audio_input if isinstance(audio_input, str) else "-", + output_path=audio_output if isinstance(audio_output, str) else "-", + extra_input_args=extra_input_args or [], + loglevel="info" + if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) + else "error", + ) + super().__init__( + ffmpeg_args, + stdin=True if isinstance(audio_input, str) else audio_input, + stdout=True if isinstance(audio_output, str) else audio_output, + stderr=stderr_enabled, + name=name, + ) async def crossfade_pcm_parts( @@ -201,10 +237,10 @@ async def get_stream_details( Do not try to request streamdetails in advance as this is expiring data. param media_item: The QueueItem for which to request the streamdetails for. """ - if queue_item.streamdetails and (time() + 60) < queue_item.streamdetails.expires: + if queue_item.streamdetails and seek_position: LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}") - # we already have (fresh) streamdetails stored on the queueitem, use these. - # this happens for example while seeking in a track. + # we already have (fresh?) streamdetails stored on the queueitem, use these. + # only do this when we're seeking. # we create a copy (using to/from dict) to ensure the one-time values are cleared streamdetails = StreamDetails.from_dict(queue_item.streamdetails.to_dict()) else: @@ -222,13 +258,6 @@ async def get_stream_details( if not music_prov: LOGGER.debug(f"Skipping {prov_media} - provider not available") continue # provider not available ? - # prefer cache - item_key = f"{music_prov.lookup_key}/{prov_media.item_id}" - cache_key = f"cached_streamdetails_{item_key}" - if cache := await mass.cache.get(cache_key): - if time() + 60 < cache["expires"]: - LOGGER.debug(f"Using cached streamdetails for {item_key}") - streamdetails = StreamDetails.from_dict(cache) # get streamdetails from provider try: streamdetails: StreamDetails = await music_prov.get_stream_details( @@ -237,9 +266,6 @@ async def get_stream_details( except MusicAssistantError as err: LOGGER.warning(str(err)) else: - # store streamdetails in cache - expiration = streamdetails.expires - time() - await mass.cache.set(cache_key, streamdetails.to_dict(), expiration=expiration - 60) break else: raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}") @@ -322,9 +348,7 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration= return file.getvalue() -async def resolve_radio_stream( - mass: MusicAssistant, url: str, use_get: bool = False -) -> tuple[str, bool, bool]: +async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]: """ Resolve a streaming radio URL. @@ -345,16 +369,15 @@ async def resolve_radio_stream( resolved_url = url timeout = ClientTimeout(total=0, connect=10, sock_read=5) try: - method = "GET" if use_get else "HEAD" - async with mass.http_session.request( - method, url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout + async with mass.http_session.get( + url, headers=HTTP_HEADERS_ICY, allow_redirects=True, timeout=timeout ) as resp: resolved_url = str(resp.real_url) headers = resp.headers resp.raise_for_status() if not resp.headers: raise InvalidDataError("no headers found") - supports_icy = headers.get("icy-name") is not None or "Icecast" in headers.get("server", "") + supports_icy = headers.get("icy-metaint") is not None is_hls = headers.get("content-type") in HLS_CONTENT_TYPES if ( base_url.endswith((".m3u", ".m3u8", ".pls")) @@ -371,9 +394,7 @@ async def resolve_radio_stream( except IsHLSPlaylist: is_hls = True - except (ClientResponseError, InvalidDataError) as err: - if not use_get: - return await resolve_radio_stream(mass, resolved_url, True) + except Exception as err: LOGGER.warning("Error while parsing radio URL %s: %s", url, err) return (resolved_url, supports_icy, is_hls) @@ -383,33 +404,13 @@ async def resolve_radio_stream( return result -async def get_radio_stream( - mass: MusicAssistant, url: str, streamdetails: StreamDetails -) -> AsyncGenerator[bytes, None]: - """Get radio audio stream from HTTP, including metadata retrieval.""" - resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url) - # handle special HLS stream - if is_hls: - async for chunk in get_hls_stream(mass, resolved_url, streamdetails): - yield chunk - return - # handle http stream supports icy metadata - if supports_icy: - async for chunk in get_icy_stream(mass, resolved_url, streamdetails): - yield chunk - return - # generic http stream (without icy metadata) - async for chunk in get_http_stream(mass, resolved_url, streamdetails): - yield chunk - - async def get_icy_stream( mass: MusicAssistant, url: str, streamdetails: StreamDetails ) -> AsyncGenerator[bytes, None]: """Get (radio) audio stream from HTTP, including ICY metadata retrieval.""" timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) LOGGER.debug("Start streaming radio with ICY metadata from url %s", url) - async with mass.http_session.get(url, headers=VLC_HEADERS_ICY, timeout=timeout) as resp: + async with mass.http_session.get(url, headers=HTTP_HEADERS_ICY, timeout=timeout) as resp: headers = resp.headers meta_int = int(headers["icy-metaint"]) while True: @@ -440,7 +441,7 @@ async def get_hls_stream( timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) # fetch master playlist and select (best) child playlist # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10 - async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp: + async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp: charset = resp.charset or "utf-8" master_m3u_data = await resp.text(charset) substreams = parse_m3u(master_m3u_data) @@ -471,7 +472,7 @@ async def get_hls_stream( has_id3_metadata: bool | None = None while True: async with mass.http_session.get( - substream_url, headers=VLC_HEADERS, timeout=timeout + substream_url, headers=HTTP_HEADERS, timeout=timeout ) as resp: charset = resp.charset or "utf-8" substream_m3u_data = await resp.text(charset) @@ -487,7 +488,7 @@ async def get_hls_stream( chunk_item_url = base_path + "/" + chunk_item.path # handle (optional) in-playlist (timed) metadata if has_playlist_metadata is None: - has_playlist_metadata = chunk_item.title is not None + has_playlist_metadata = chunk_item.title not in (None, "") logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata) if has_playlist_metadata and chunk_item.title != "no desc": # bbc (and maybe others?) set the title to 'no desc' @@ -496,7 +497,7 @@ async def get_hls_stream( # prevent that we play this chunk again if we loop through prev_chunks.append(chunk_item.path) async with mass.http_session.get( - chunk_item_url, headers=VLC_HEADERS, timeout=timeout + chunk_item_url, headers=HTTP_HEADERS, timeout=timeout ) as resp: async for chunk in resp.content.iter_any(): yield chunk @@ -522,13 +523,13 @@ async def get_http_stream( # try to get filesize with a head request seek_supported = streamdetails.can_seek if seek_position or not streamdetails.size: - async with mass.http_session.head(url, headers=VLC_HEADERS) as resp: + async with mass.http_session.head(url, headers=HTTP_HEADERS) as resp: resp.raise_for_status() if size := resp.headers.get("Content-Length"): streamdetails.size = int(size) seek_supported = resp.headers.get("Accept-Ranges") == "bytes" # headers - headers = {**VLC_HEADERS} + headers = {**HTTP_HEADERS} timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) skip_bytes = 0 if seek_position and streamdetails.size: @@ -545,23 +546,13 @@ async def get_http_stream( ContentType.M4B, ) ): - LOGGER.debug( - "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.", + LOGGER.warning( + "Seeking in %s (%s) not possible.", streamdetails.uri, streamdetails.audio_format.output_format_str, ) - async for chunk in get_ffmpeg_stream( - url, - # we must set the input content type to unknown to - # enforce ffmpeg to determine it from the headers - input_format=AudioFormat(content_type=ContentType.UNKNOWN), - # enforce wav as we dont want to re-encode lossy formats - # choose wav so we have descriptive headers and move on - output_format=AudioFormat(content_type=ContentType.WAV), - extra_input_args=["-ss", str(seek_position)], - ): - yield chunk - return + seek_position = 0 + streamdetails.seek_position = 0 # start the streaming from http bytes_received = 0 @@ -608,23 +599,13 @@ async def get_file_stream( ContentType.MP4, ) ): - LOGGER.debug( - "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.", + LOGGER.warning( + "Seeking in %s (%s) not possible.", streamdetails.uri, streamdetails.audio_format.output_format_str, ) - async for chunk in get_ffmpeg_stream( - filename, - # we must set the input content type to unknown to - # enforce ffmpeg to determine it from the headers - input_format=AudioFormat(content_type=ContentType.UNKNOWN), - # enforce wav as we dont want to re-encode lossy formats - # choose wav so we have descriptive headers and move on - output_format=AudioFormat(content_type=ContentType.WAV), - extra_input_args=["-ss", str(seek_position)], - ): - yield chunk - return + seek_position = 0 + streamdetails.seek_position = 0 chunk_size = get_chunksize(streamdetails.audio_format) async with aiofiles.open(streamdetails.data, "rb") as _file: @@ -646,9 +627,8 @@ async def get_ffmpeg_stream( filter_params: list[str] | None = None, extra_args: list[str] | None = None, chunk_size: int | None = None, - ffmpeg_loglevel: str = "info", extra_input_args: list[str] | None = None, - logger: logging.Logger | None = None, + name: str = "ffmpeg", ) -> AsyncGenerator[bytes, None]: """ Get the ffmpeg audio stream as async generator. @@ -656,27 +636,18 @@ async def get_ffmpeg_stream( Takes care of resampling and/or recoding if needed, according to player preferences. """ - ffmpeg_args = get_ffmpeg_args( + async with FFMpeg( + audio_input=audio_input, input_format=input_format, output_format=output_format, - filter_params=filter_params or [], - extra_args=extra_args or [], - input_path=audio_input if isinstance(audio_input, str) else "-", - output_path="-", - loglevel=ffmpeg_loglevel, - extra_input_args=extra_input_args or [], - ) - stdin = audio_input if not isinstance(audio_input, str) else True - async with AsyncProcess( - ffmpeg_args, - stdin=stdin, - stdout=True, - stderr=logger or LOGGER.getChild("ffmpeg_stream"), - name="ffmpeg_stream", + filter_params=filter_params, + extra_args=extra_args, + extra_input_args=extra_input_args, + name=name, ) as ffmpeg_proc: # read final chunks from stdout - chunk_size = chunk_size or get_chunksize(output_format, 1) - async for chunk in ffmpeg_proc.iter_chunked(chunk_size): + iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any() + async for chunk in iterator: yield chunk @@ -809,8 +780,8 @@ def get_ffmpeg_args( extra_args: list[str] | None = None, input_path: str = "-", output_path: str = "-", - loglevel: str = "info", extra_input_args: list[str] | None = None, + loglevel: str = "error", ) -> list[str]: """Collect all args to send to the ffmpeg process.""" if extra_args is None: @@ -833,6 +804,7 @@ def get_ffmpeg_args( "-hide_banner", "-loglevel", loglevel, + "-nostats", "-ignore_unknown", "-protocol_whitelist", "file,http,https,tcp,tls,crypto,pipe,data,fd", diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 532de822..9dc88511 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -19,7 +19,7 @@ from signal import SIGINT from types import TracebackType from typing import TYPE_CHECKING -from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL +from music_assistant.constants import MASS_LOGGER_NAME LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process") @@ -42,16 +42,16 @@ class AsyncProcess: args: list[str], stdin: bool | int | AsyncGenerator[bytes, None] | None = None, stdout: bool | int | None = None, - stderr: bool | int | logging.Logger | None = None, + stderr: bool | int | None = False, name: str | None = None, ) -> None: """Initialize AsyncProcess.""" self.proc: asyncio.subprocess.Process | None = None - if isinstance(stderr, logging.Logger): - self._stderr_logger = stderr - stderr = asyncio.subprocess.PIPE - else: - self._stderr_logger = None + if name is None: + name = args[0].split(os.sep)[-1] + self.name = name + self.attached_tasks: list[asyncio.Task] = [] + self.logger = LOGGER.getChild(name) self._args = args self._stdin = stdin self._stdout = stdout @@ -61,10 +61,6 @@ class AsyncProcess: self._stderr_enabled = stderr not in (None, False) self._close_called = False self._returncode: bool | None = None - if name is None: - name = self._args[0].split(os.sep)[-1] - self.name = name - self.attached_tasks: list[asyncio.Task] = [] @property def closed(self) -> bool: @@ -100,29 +96,17 @@ class AsyncProcess: async def start(self) -> None: """Perform Async init of process.""" - if self._stdin is True or isinstance(self._stdin, AsyncGenerator): - stdin = asyncio.subprocess.PIPE - else: - stdin = self._stdin - if self._stdout is True or isinstance(self._stdout, AsyncGenerator): - stdout = asyncio.subprocess.PIPE - else: - stdout = self._stdout - if self._stderr is True or isinstance(self._stderr, AsyncGenerator): - stderr = asyncio.subprocess.PIPE - else: - stderr = self._stderr self.proc = await asyncio.create_subprocess_exec( *self._args, - stdin=stdin if self._stdin_enabled else None, - stdout=stdout if self._stdout_enabled else None, - stderr=stderr if self._stderr_enabled else None, + stdin=asyncio.subprocess.PIPE + if (self._stdin is True or isinstance(self._stdin, AsyncGenerator)) + else self._stdin, + stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout, + stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr, ) - LOGGER.debug("Process %s started with PID %s", self.name, self.proc.pid) - if not isinstance(self._stdin, int | None): + self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid) + if isinstance(self._stdin, AsyncGenerator): self.attached_tasks.append(asyncio.create_task(self._feed_stdin())) - if self._stderr_logger: - self.attached_tasks.append(asyncio.create_task(self._read_stderr())) async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" @@ -213,13 +197,13 @@ class AsyncProcess: if self.returncode is not None: break except TimeoutError: - LOGGER.debug( + self.logger.debug( "Process %s with PID %s did not stop in time. Sending terminate...", self.name, self.proc.pid, ) self.proc.terminate() - LOGGER.debug( + self.logger.debug( "Process %s with PID %s stopped with returncode %s", self.name, self.proc.pid, @@ -240,25 +224,6 @@ class AsyncProcess: self._returncode = self.proc.returncode return (stdout, stderr) - async def iter_stderr(self) -> AsyncGenerator[bytes, None]: - """Iterate lines from the stderr stream.""" - while self.returncode is None: - try: - line = await self.proc.stderr.readline() - if line == b"": - break - yield line - except ValueError as err: - # we're waiting for a line (separator found), but the line was too big - # this may happen with ffmpeg during a long (radio) stream where progress - # gets outputted to the stderr but no newline - # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit - # NOTE: this consumes the line that was too big - if "chunk exceed the limit" in str(err): - continue - # raise for all other (value) errors - raise - async def _feed_stdin(self) -> None: """Feed stdin with chunks from an AsyncGenerator.""" if TYPE_CHECKING: @@ -269,24 +234,39 @@ class AsyncProcess: return await self.write(chunk) await self.write_eof() - except asyncio.CancelledError: + except Exception as err: + if not isinstance(err, asyncio.CancelledError): + self.logger.exception(err) # make sure the stdin generator is also properly closed # by propagating a cancellederror within task = asyncio.create_task(self._stdin.__anext__()) task.cancel() - async def _read_stderr(self) -> None: - """Read stderr and log to logger.""" - async for line in self.iter_stderr(): - line = line.decode().strip() # noqa: PLW2901 + async def read_stderr(self) -> bytes: + """Read line from stderr.""" + try: + return await self.proc.stderr.readline() + except ValueError as err: + # we're waiting for a line (separator found), but the line was too big + # this may happen with ffmpeg during a long (radio) stream where progress + # gets outputted to the stderr but no newline + # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit + # NOTE: this consumes the line that was too big + if "chunk exceed the limit" in str(err): + return await self.proc.stderr.readline() + # raise for all other (value) errors + raise + + async def iter_stderr(self) -> AsyncGenerator[str, None]: + """Iterate lines from the stderr stream as string.""" + while True: + line = await self.read_stderr() + if line == b"": + break + line = line.decode().strip() if not line: continue - if "error" in line.lower(): - self._stderr_logger.error(line) - elif "warning" in line.lower(): - self._stderr_logger.warning(line) - else: - self._stderr_logger.log(VERBOSE_LOG_LEVEL, line) + yield line async def check_output(args: str | list[str]) -> tuple[int, bytes]: diff --git a/music_assistant/server/models/music_provider.py b/music_assistant/server/models/music_provider.py index 5300405c..e1b981c8 100644 --- a/music_assistant/server/models/music_provider.py +++ b/music_assistant/server/models/music_provider.py @@ -251,7 +251,11 @@ class MusicProvider(Provider): async def get_audio_stream( # type: ignore[return] self, streamdetails: StreamDetails, seek_position: int = 0 ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" + """ + Return the (custom) audio stream for the provider item. + + Will only be called when the stream_type is set to CUSTOM. + """ raise NotImplementedError async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None: diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 92e343af..107e8d8b 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -265,7 +265,6 @@ class AirplayStream: input_format=self.input_format, output_format=AIRPLAY_PCM_FORMAT, filter_params=get_player_filter_params(self.mass, player_id), - loglevel="fatal", ) self._ffmpeg_proc = AsyncProcess( ffmpeg_args, diff --git a/music_assistant/server/providers/deezer/__init__.py b/music_assistant/server/providers/deezer/__init__.py index 967b670e..e77b5da3 100644 --- a/music_assistant/server/providers/deezer/__init__.py +++ b/music_assistant/server/providers/deezer/__init__.py @@ -26,6 +26,7 @@ from music_assistant.common.models.enums import ( ImageType, MediaType, ProviderFeature, + StreamType, ) from music_assistant.common.models.errors import LoginFailed from music_assistant.common.models.media_items import ( @@ -450,9 +451,9 @@ class DeezerProvider(MusicProvider): # pylint: disable=W0223 audio_format=AudioFormat( content_type=ContentType.try_parse(url_details["format"].split("_")[0]) ), + stream_type=StreamType.CUSTOM, duration=int(song_data["DURATION"]), data={"url": url, "format": url_details["format"]}, - expires=url_details["exp"], size=int(song_data[f"FILESIZE_{url_details['format']}"]), ) diff --git a/music_assistant/server/providers/filesystem_local/__init__.py b/music_assistant/server/providers/filesystem_local/__init__.py index 5695c40f..ebebc14d 100644 --- a/music_assistant/server/providers/filesystem_local/__init__.py +++ b/music_assistant/server/providers/filesystem_local/__init__.py @@ -13,9 +13,7 @@ from aiofiles.os import wrap from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.errors import SetupFailedError -from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import CONF_PATH -from music_assistant.server.helpers.audio import get_file_stream from .base import ( CONF_ENTRY_MISSING_ALBUM_ARTIST, @@ -169,14 +167,6 @@ class LocalFileSystemProvider(FileSystemProviderBase): abs_path = get_absolute_path(self.base_path, file_path) return await exists(abs_path) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - abs_path = get_absolute_path(self.base_path, streamdetails.item_id) - async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position): - yield chunk - async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]: """Yield (binary) contents of file in chunks of bytes.""" abs_path = get_absolute_path(self.base_path, file_path) diff --git a/music_assistant/server/providers/filesystem_local/base.py b/music_assistant/server/providers/filesystem_local/base.py index b926837e..ba57c40a 100644 --- a/music_assistant/server/providers/filesystem_local/base.py +++ b/music_assistant/server/providers/filesystem_local/base.py @@ -18,7 +18,7 @@ from music_assistant.common.models.config_entries import ( ConfigEntryType, ConfigValueOption, ) -from music_assistant.common.models.enums import ExternalID, ProviderFeature +from music_assistant.common.models.enums import ExternalID, ProviderFeature, StreamType from music_assistant.common.models.errors import ( InvalidDataError, MediaNotFoundError, @@ -613,9 +613,11 @@ class FileSystemProviderBase(MusicProvider): item_id=item_id, audio_format=prov_mapping.audio_format, media_type=MediaType.TRACK, + stream_type=StreamType.LOCAL_FILE if file_item.local_path else StreamType.CUSTOM, duration=library_item.duration, size=file_item.file_size, - data=file_item.local_path, + data=file_item, + path=file_item.local_path, can_seek=prov_mapping.audio_format.content_type in SEEKABLE_FILES, ) diff --git a/music_assistant/server/providers/jellyfin/__init__.py b/music_assistant/server/providers/jellyfin/__init__.py index dccc2cd4..b999ad27 100644 --- a/music_assistant/server/providers/jellyfin/__init__.py +++ b/music_assistant/server/providers/jellyfin/__init__.py @@ -26,6 +26,7 @@ from music_assistant.common.models.enums import ( ImageType, MediaType, ProviderFeature, + StreamType, ) from music_assistant.common.models.errors import ( InvalidDataError, @@ -52,7 +53,6 @@ from music_assistant.common.models.media_items import Artist as JellyfinArtist from music_assistant.common.models.media_items import Playlist as JellyfinPlaylist from music_assistant.common.models.media_items import Track as JellyfinTrack from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.server.helpers.audio import get_http_stream if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest @@ -725,21 +725,25 @@ class JellyfinProvider(MusicProvider): jellyfin_track = API.get_item(self._jellyfin_server.jellyfin, item_id) mimetype = self._media_mime_type(jellyfin_track) media_stream = jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0] + url = API.audio_url( + self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID], SUPPORTED_CONTAINER_FORMATS + ) if ITEM_KEY_MEDIA_CODEC in media_stream: - media_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC]) + content_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC]) else: - media_type = ContentType.try_parse(mimetype) + content_type = ContentType.try_parse(mimetype) return StreamDetails( item_id=jellyfin_track[ITEM_KEY_ID], provider=self.instance_id, audio_format=AudioFormat( - content_type=media_type, + content_type=content_type, channels=jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CHANNELS], ), + stream_type=StreamType.HTTP, duration=int( jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000 ), # 10000000 ticks per millisecond) - data=jellyfin_track, + path=url, ) def _get_thumbnail_url(self, client: JellyfinClient, media_item: dict[str, Any]) -> str | None: @@ -808,14 +812,3 @@ class JellyfinProvider(MusicProvider): mime_type, _ = mimetypes.guess_type(path) return mime_type - - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - url = API.audio_url( - self._jellyfin_server.jellyfin, streamdetails.item_id, SUPPORTED_CONTAINER_FORMATS - ) - - async for chunk in get_http_stream(self.mass, url, streamdetails, seek_position): - yield chunk diff --git a/music_assistant/server/providers/opensubsonic/sonic_provider.py b/music_assistant/server/providers/opensubsonic/sonic_provider.py index fe34e475..8796f373 100644 --- a/music_assistant/server/providers/opensubsonic/sonic_provider.py +++ b/music_assistant/server/providers/opensubsonic/sonic_provider.py @@ -15,7 +15,13 @@ from libopensonic.errors import ( SonicError, ) -from music_assistant.common.models.enums import ContentType, ImageType, MediaType, ProviderFeature +from music_assistant.common.models.enums import ( + ContentType, + ImageType, + MediaType, + ProviderFeature, + StreamType, +) from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError from music_assistant.common.models.media_items import ( Album, @@ -671,6 +677,7 @@ class OpenSonicProvider(MusicProvider): provider=self.instance_id, can_seek=self._seek_support, audio_format=AudioFormat(content_type=ContentType.try_parse(mime_type)), + stream_type=StreamType.CUSTOM, duration=sonic_song.duration if sonic_song.duration is not None else 0, ) diff --git a/music_assistant/server/providers/plex/__init__.py b/music_assistant/server/providers/plex/__init__.py index 93367f05..7308553c 100644 --- a/music_assistant/server/providers/plex/__init__.py +++ b/music_assistant/server/providers/plex/__init__.py @@ -28,6 +28,7 @@ from music_assistant.common.models.enums import ( ImageType, MediaType, ProviderFeature, + StreamType, ) from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError from music_assistant.common.models.media_items import ( @@ -46,7 +47,6 @@ from music_assistant.common.models.media_items import ( Track, ) from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.server.helpers.audio import get_http_stream from music_assistant.server.helpers.auth import AuthenticationHelper from music_assistant.server.helpers.tags import parse_tags from music_assistant.server.models.music_provider import MusicProvider @@ -755,12 +755,13 @@ class PlexProvider(MusicProvider): content_type=media_type, channels=media.audioChannels, ), + stream_type=StreamType.HTTP, duration=plex_track.duration, data=plex_track, ) if media_type != ContentType.M4A: - stream_details.data = self._plex_server.url(media_part.key, True) + stream_details.path = self._plex_server.url(media_part.key, True) if audio_stream.samplingRate: stream_details.audio_format.sample_rate = audio_stream.samplingRate if audio_stream.bitDepth: @@ -769,7 +770,7 @@ class PlexProvider(MusicProvider): else: url = plex_track.getStreamURL() media_info = await parse_tags(url) - + stream_details.path = url stream_details.audio_format.channels = media_info.channels stream_details.audio_format.content_type = ContentType.try_parse(media_info.format) stream_details.audio_format.sample_rate = media_info.sample_rate @@ -777,17 +778,6 @@ class PlexProvider(MusicProvider): return stream_details - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - if isinstance(streamdetails.data, str): - url = streamdetails.data - else: - url = streamdetails.data.getStreamURL(offset=seek_position) - async for chunk in get_http_stream(self.mass, url, streamdetails, 0): - yield chunk - async def get_myplex_account_and_refresh_token(self, auth_token: str) -> MyPlexAccount: """Get a MyPlexAccount object and refresh the token if needed.""" diff --git a/music_assistant/server/providers/qobuz/__init__.py b/music_assistant/server/providers/qobuz/__init__.py index dbb07534..0e975730 100644 --- a/music_assistant/server/providers/qobuz/__init__.py +++ b/music_assistant/server/providers/qobuz/__init__.py @@ -13,7 +13,12 @@ from asyncio_throttle import Throttler from music_assistant.common.helpers.util import parse_title_and_version, try_parse_int from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType -from music_assistant.common.models.enums import ConfigEntryType, ExternalID, ProviderFeature +from music_assistant.common.models.enums import ( + ConfigEntryType, + ExternalID, + ProviderFeature, + StreamType, +) from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError from music_assistant.common.models.media_items import ( Album, @@ -43,7 +48,6 @@ from music_assistant.constants import ( from music_assistant.server.helpers.app_vars import app_var # pylint: enable=no-name-in-module -from music_assistant.server.helpers.audio import get_http_stream from music_assistant.server.models.music_provider import MusicProvider if TYPE_CHECKING: @@ -401,6 +405,7 @@ class QobuzProvider(MusicProvider): else: msg = f"Unsupported mime type for {item_id}" raise MediaNotFoundError(msg) + self.mass.create_task(self._report_playback_started(streamdata)) return StreamDetails( item_id=str(item_id), provider=self.instance_id, @@ -409,22 +414,12 @@ class QobuzProvider(MusicProvider): sample_rate=int(streamdata["sampling_rate"] * 1000), bit_depth=streamdata["bit_depth"], ), + stream_type=StreamType.HTTP, duration=streamdata["duration"], data=streamdata, # we need these details for reporting playback - expires=time.time() + 300, # url expires very fast + path=streamdata["url"], ) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - # report playback started as soon as we start streaming - self.mass.create_task(self._report_playback_started(streamdetails.data)) - async for chunk in get_http_stream( - self.mass, streamdetails.data["url"], streamdetails, seek_position - ): - yield chunk - async def _report_playback_started(self, streamdata: dict) -> None: """Report playback start to qobuz.""" # TODO: need to figure out if the streamed track is purchased by user diff --git a/music_assistant/server/providers/radiobrowser/__init__.py b/music_assistant/server/providers/radiobrowser/__init__.py index 5141a397..7adc431f 100644 --- a/music_assistant/server/providers/radiobrowser/__init__.py +++ b/music_assistant/server/providers/radiobrowser/__init__.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING from radios import FilterBy, Order, RadioBrowser, RadioBrowserError -from music_assistant.common.models.enums import LinkType, ProviderFeature +from music_assistant.common.models.enums import LinkType, ProviderFeature, StreamType from music_assistant.common.models.media_items import ( AudioFormat, BrowseFolder, @@ -22,7 +22,6 @@ from music_assistant.common.models.media_items import ( SearchResults, ) from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.server.helpers.audio import get_radio_stream from music_assistant.server.models.music_provider import MusicProvider SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE) @@ -290,14 +289,7 @@ class RadioBrowserProvider(MusicProvider): content_type=ContentType.try_parse(stream.codec), ), media_type=MediaType.RADIO, - data=stream.url_resolved, - expires=time() + 3600, + stream_type=StreamType.HTTP, + path=stream.url_resolved, + can_seek=False, ) - - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - # report playback started as soon as we start streaming - async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails): - yield chunk diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 041b4582..e42fd6fd 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -36,7 +36,7 @@ from music_assistant.common.models.errors import SetupFailedError from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.constants import UGP_PREFIX -from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params +from music_assistant.server.helpers.audio import FFMpeg, get_player_filter_params from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.models.player_provider import PlayerProvider @@ -359,24 +359,16 @@ class SnapCastProvider(PlayerProvider): stream.set_callback(stream_callback) stream_path = f"tcp://{host}:{port}" self.logger.debug("Start streaming to %s", stream_path) - ffmpeg_args = get_ffmpeg_args( - input_format=input_format, - output_format=DEFAULT_SNAPCAST_FORMAT, - filter_params=get_player_filter_params(self.mass, player_id), - output_path=f"tcp://{host}:{port}", - loglevel="fatal", - ) try: - async with AsyncProcess( - ffmpeg_args, - stdin=True, - stdout=False, - stderr=self.logger.getChild("ffmpeg"), + async with FFMpeg( + audio_input=audio_source, + input_format=input_format, + output_format=DEFAULT_SNAPCAST_FORMAT, + filter_params=get_player_filter_params(self.mass, player_id), name="snapcast_ffmpeg", + audio_output=f"tcp://{host}:{port}", ) as ffmpeg_proc: - async for chunk in audio_source: - await ffmpeg_proc.write(chunk) - await ffmpeg_proc.write_eof() + await ffmpeg_proc.wait() # we need to wait a bit for the stream status to become idle # to ensure that all snapclients have consumed the audio await self.mass.players.wait_for_state(player, PlayerState.IDLE) diff --git a/music_assistant/server/providers/sonos/player.py b/music_assistant/server/providers/sonos/player.py index 4bfeb783..20388f42 100644 --- a/music_assistant/server/providers/sonos/player.py +++ b/music_assistant/server/providers/sonos/player.py @@ -535,7 +535,8 @@ class SonosPlayer: """Handle callback for topology change event.""" if xml := event.variables.get("zone_group_state"): zgs = ET.fromstring(xml) - for vanished_device in zgs.find("VanishedDevices") or []: + vanished_devices = zgs.find("VanishedDevices") or [] + for vanished_device in vanished_devices: if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS: self.logger.debug( "Ignoring %s marked %s as vanished with reason: %s", diff --git a/music_assistant/server/providers/soundcloud/__init__.py b/music_assistant/server/providers/soundcloud/__init__.py index 5d2385f6..58717255 100644 --- a/music_assistant/server/providers/soundcloud/__init__.py +++ b/music_assistant/server/providers/soundcloud/__init__.py @@ -10,7 +10,7 @@ from soundcloudpy import SoundcloudAsyncAPI from music_assistant.common.helpers.util import parse_title_and_version from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType -from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature +from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType from music_assistant.common.models.errors import InvalidDataError, LoginFailed from music_assistant.common.models.media_items import ( Artist, @@ -26,11 +26,6 @@ from music_assistant.common.models.media_items import ( Track, ) from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.server.helpers.audio import ( - get_hls_stream, - get_http_stream, - resolve_radio_stream, -) from music_assistant.server.models.music_provider import MusicProvider CONF_CLIENT_ID = "client_id" @@ -315,25 +310,10 @@ class SoundcloudMusicProvider(MusicProvider): audio_format=AudioFormat( content_type=ContentType.try_parse(stream_format), ), + stream_type=StreamType.HTTP, data=url, ) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - _, _, is_hls = await resolve_radio_stream(self.mass, streamdetails.data) - if is_hls: - # some soundcloud streams are HLS, prefer the radio streamer - async for chunk in get_hls_stream(self.mass, streamdetails.data, streamdetails): - yield chunk - return - # regular stream from http - async for chunk in get_http_stream( - self.mass, streamdetails.data, streamdetails, seek_position - ): - yield chunk - async def _parse_artist(self, artist_obj: dict) -> Artist: """Parse a Soundcloud user response to Artist model object.""" artist_id = None diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 841980f1..80a775ee 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -17,7 +17,12 @@ from asyncio_throttle import Throttler from music_assistant.common.helpers.json import json_loads from music_assistant.common.helpers.util import parse_title_and_version from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType -from music_assistant.common.models.enums import ConfigEntryType, ExternalID, ProviderFeature +from music_assistant.common.models.enums import ( + ConfigEntryType, + ExternalID, + ProviderFeature, + StreamType, +) from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError from music_assistant.common.models.media_items import ( Album, @@ -396,18 +401,13 @@ class SpotifyProvider(MusicProvider): async def get_stream_details(self, item_id: str) -> StreamDetails: """Return the content details for the given track when it will be streamed.""" - # make sure a valid track is requested. - track = await self.get_track(item_id) return StreamDetails( - item_id=track.item_id, + item_id=item_id, provider=self.instance_id, audio_format=AudioFormat( content_type=ContentType.OGG, ), - duration=track.duration, - # these streamdetails may be cached for a long time, - # as there is no time sensitive info in them - expires=time.time() + 30 * 24 * 3600, + stream_type=StreamType.CUSTOM, ) async def get_audio_stream( @@ -437,16 +437,6 @@ class SpotifyProvider(MusicProvider): yield chunk bytes_sent += len(chunk) - if bytes_sent == 0 and not self._ap_workaround: - # AP resolve failure - # https://github.com/librespot-org/librespot/issues/972 - # retry with ap-port set to invalid value, which will force fallback - args += ["--ap-port", "12345"] - async with AsyncProcess(args, stdout=True) as librespot_proc: - async for chunk in librespot_proc.iter_any(): - yield chunk - self._ap_workaround = True - async def _parse_artist(self, artist_obj): """Parse spotify artist object to generic layout.""" artist = Artist( diff --git a/music_assistant/server/providers/tidal/__init__.py b/music_assistant/server/providers/tidal/__init__.py index 57b7b107..eea1e5de 100644 --- a/music_assistant/server/providers/tidal/__init__.py +++ b/music_assistant/server/providers/tidal/__init__.py @@ -27,6 +27,7 @@ from music_assistant.common.models.enums import ( ImageType, MediaType, ProviderFeature, + StreamType, ) from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError from music_assistant.common.models.media_items import ( @@ -44,7 +45,6 @@ from music_assistant.common.models.media_items import ( Track, ) from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.server.helpers.audio import get_http_stream from music_assistant.server.helpers.auth import AuthenticationHelper from music_assistant.server.helpers.tags import AudioTags, parse_tags from music_assistant.server.models.music_provider import MusicProvider @@ -439,20 +439,11 @@ class TidalProvider(MusicProvider): bit_depth=media_info.bits_per_sample, channels=media_info.channels, ), + stream_type=StreamType.HTTP, duration=track.duration, - data=url, + path=url, ) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - # report playback started as soon as we start streaming - async for chunk in get_http_stream( - self.mass, streamdetails.data, streamdetails, seek_position - ): - yield chunk - async def get_artist(self, prov_artist_id: str) -> Artist: """Get artist details for given artist id.""" tidal_session = await self._get_tidal_session() diff --git a/music_assistant/server/providers/tunein/__init__.py b/music_assistant/server/providers/tunein/__init__.py index af55ab46..408599ec 100644 --- a/music_assistant/server/providers/tunein/__init__.py +++ b/music_assistant/server/providers/tunein/__init__.py @@ -2,14 +2,13 @@ from __future__ import annotations -from time import time from typing import TYPE_CHECKING from asyncio_throttle import Throttler from music_assistant.common.helpers.util import create_sort_name from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType -from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature +from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError from music_assistant.common.models.media_items import ( AudioFormat, @@ -22,7 +21,6 @@ from music_assistant.common.models.media_items import ( ) from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import CONF_USERNAME -from music_assistant.server.helpers.audio import get_radio_stream from music_assistant.server.helpers.tags import parse_tags from music_assistant.server.models.music_provider import MusicProvider @@ -227,8 +225,9 @@ class TuneInProvider(MusicProvider): content_type=ContentType.UNKNOWN, ), media_type=MediaType.RADIO, - data=item_id, - expires=time() + 3600, + stream_type=StreamType.HTTP, + path=item_id, + can_seek=False, ) stream_item_id, media_type = item_id.split("--", 1) stream_info = await self.__get_data("Tune.ashx", id=stream_item_id) @@ -242,20 +241,13 @@ class TuneInProvider(MusicProvider): content_type=ContentType(stream["media_type"]), ), media_type=MediaType.RADIO, - data=stream["url"], - expires=time() + 3600, + stream_type=StreamType.HTTP, + path=stream["url"], + can_seek=False, ) msg = f"Unable to retrieve stream details for {item_id}" raise MediaNotFoundError(msg) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - # report playback started as soon as we start streaming - async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails): - yield chunk - async def __get_data(self, endpoint: str, **kwargs): """Get data from api.""" if endpoint.startswith("http"): diff --git a/music_assistant/server/providers/url/__init__.py b/music_assistant/server/providers/url/__init__.py index 9a4d950f..4003402b 100644 --- a/music_assistant/server/providers/url/__init__.py +++ b/music_assistant/server/providers/url/__init__.py @@ -2,10 +2,9 @@ from __future__ import annotations -import os from typing import TYPE_CHECKING -from music_assistant.common.models.enums import ContentType, ImageType, MediaType +from music_assistant.common.models.enums import ContentType, ImageType, MediaType, StreamType from music_assistant.common.models.errors import MediaNotFoundError from music_assistant.common.models.media_items import ( Artist, @@ -17,18 +16,10 @@ from music_assistant.common.models.media_items import ( Track, ) from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.server.helpers.audio import ( - get_file_stream, - get_http_stream, - get_radio_stream, - resolve_radio_stream, -) from music_assistant.server.helpers.tags import AudioTags, parse_tags from music_assistant.server.models.music_provider import MusicProvider if TYPE_CHECKING: - from collections.abc import AsyncGenerator - from music_assistant.common.models.config_entries import ( ConfigEntry, ConfigValueType, @@ -179,8 +170,7 @@ class URLProvider(MusicProvider): if cached_info and not force_refresh: return AudioTags.parse(cached_info) # parse info with ffprobe (and store in cache) - resolved_url, _, _ = await resolve_radio_stream(self.mass, url) - media_info = await parse_tags(resolved_url) + media_info = await parse_tags(url) if "authSig" in url: media_info.has_cover_image = False await self.mass.cache.set(cache_key, media_info.raw) @@ -199,28 +189,7 @@ class URLProvider(MusicProvider): bit_depth=media_info.bits_per_sample, ), media_type=MediaType.RADIO if is_radio else MediaType.TRACK, - data={"url": item_id}, + stream_type=StreamType.HTTP, + path=item_id, + can_seek=not is_radio, ) - - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - if streamdetails.media_type == MediaType.RADIO: - # radio stream url - async for chunk in get_radio_stream( - self.mass, streamdetails.data["url"], streamdetails - ): - yield chunk - elif os.path.isfile(streamdetails.data): - # local file - async for chunk in get_file_stream( - self.mass, streamdetails.data["url"], streamdetails, seek_position - ): - yield chunk - else: - # regular stream url (without icy meta) - async for chunk in get_http_stream( - self.mass, streamdetails.data["url"], streamdetails, seek_position - ): - yield chunk diff --git a/music_assistant/server/providers/ytmusic/__init__.py b/music_assistant/server/providers/ytmusic/__init__.py index 76fc93d2..12a635c8 100644 --- a/music_assistant/server/providers/ytmusic/__init__.py +++ b/music_assistant/server/providers/ytmusic/__init__.py @@ -12,10 +12,9 @@ from typing import TYPE_CHECKING from urllib.parse import unquote import pytube -from aiohttp import ClientResponseError from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType -from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature +from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType from music_assistant.common.models.errors import ( InvalidDataError, LoginFailed, @@ -40,7 +39,6 @@ from music_assistant.common.models.media_items import ( Track, ) from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.server.helpers.audio import get_http_stream from music_assistant.server.helpers.auth import AuthenticationHelper from music_assistant.server.models.music_provider import MusicProvider @@ -535,44 +533,15 @@ class YoutubeMusicProvider(MusicProvider): audio_format=AudioFormat( content_type=ContentType.try_parse(stream_format["mimeType"]), ), - data=url, + stream_type=StreamType.HTTP, + path=url, ) - if ( - track_obj["streamingData"].get("expiresInSeconds") - and track_obj["streamingData"].get("expiresInSeconds").isdigit() - ): - stream_details.expires = time() + int( - track_obj["streamingData"].get("expiresInSeconds") - ) - else: - stream_details.expires = time() + 600 if stream_format.get("audioChannels") and str(stream_format.get("audioChannels")).isdigit(): stream_details.audio_format.channels = int(stream_format.get("audioChannels")) if stream_format.get("audioSampleRate") and stream_format.get("audioSampleRate").isdigit(): stream_details.audio_format.sample_rate = int(stream_format.get("audioSampleRate")) return stream_details - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - is_retry = False - while True: - try: - async for chunk in get_http_stream( - self.mass, streamdetails.data, streamdetails, seek_position - ): - yield chunk - return - except ClientResponseError as err: - if not is_retry and err.status == 403: - # cipher expired, get a fresh one - self.logger.warning("Cipher expired, trying to refresh...") - streamdetails = await self.get_stream_details(streamdetails.item_id) - continue - # raise for all other cases or we have already retried - raise - async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs): """Post data to the given endpoint.""" await self._check_oauth_token() @@ -908,6 +877,8 @@ class YoutubeMusicProvider(MusicProvider): "AUDIO_QUALITY_MEDIUM": 2, "AUDIO_QUALITY_HIGH": 3, } + if "streamingData" not in track_obj: + raise MediaNotFoundError("No stream found for this track") for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]: if adaptive_format["mimeType"].startswith("audio") and ( not stream_format @@ -916,6 +887,5 @@ class YoutubeMusicProvider(MusicProvider): ): stream_format = adaptive_format if stream_format is None: - msg = "No stream found for this track" - raise MediaNotFoundError(msg) + raise MediaNotFoundError("No stream found for this track") return stream_format -- 2.34.1