From 325f96beb533f3188448b501c6618420b85b8775 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sat, 23 Mar 2024 21:55:57 +0100 Subject: [PATCH] Several improvements to playing radio streams (#1167) --- music_assistant/common/models/enums.py | 1 - music_assistant/server/controllers/streams.py | 215 ++++++++- music_assistant/server/helpers/audio.py | 443 ++++++++---------- music_assistant/server/helpers/playlists.py | 137 ++++-- music_assistant/server/helpers/process.py | 18 +- .../server/providers/airplay/__init__.py | 2 +- .../server/providers/filesystem_local/base.py | 33 +- .../server/providers/radiobrowser/__init__.py | 16 +- .../server/providers/tunein/__init__.py | 20 +- .../server/providers/url/__init__.py | 69 +-- 10 files changed, 584 insertions(+), 370 deletions(-) diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index 9753c3f6..fae363c7 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -137,7 +137,6 @@ class ContentType(StrEnum): PCM_F32LE = "f32le" # PCM 32-bit floating-point little-endian PCM_F64LE = "f64le" # PCM 64-bit floating-point little-endian PCM = "pcm" # PCM generic (details determined later) - MPEG_DASH = "dash" UNKNOWN = "?" @classmethod diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 6dc0f4d3..9174f962 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -14,7 +14,7 @@ import time import urllib.parse from collections.abc import AsyncGenerator from contextlib import suppress -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import shortuuid from aiohttp import web @@ -26,8 +26,9 @@ from music_assistant.common.models.config_entries import ( ConfigValueType, ) from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType -from music_assistant.common.models.errors import QueueEmpty +from music_assistant.common.models.errors import AudioError, QueueEmpty from music_assistant.common.models.media_items import AudioFormat +from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import ( ANNOUNCE_ALERT_FILE, CONF_BIND_IP, @@ -38,15 +39,20 @@ from music_assistant.constants import ( CONF_PUBLISH_IP, SILENCE_FILE, UGP_PREFIX, + VERBOSE_LOG_LEVEL, ) from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( check_audio_support, crossfade_pcm_parts, + get_ffmpeg_args, get_ffmpeg_stream, - get_media_stream, get_player_filter_params, + get_radio_stream, + parse_loudnorm, + strip_silence, ) +from music_assistant.server.helpers.process import AsyncProcess from music_assistant.server.helpers.util import get_ips from music_assistant.server.helpers.webserver import Webserver from music_assistant.server.models.core_controller import CoreController @@ -519,8 +525,7 @@ class StreamsController(CoreController): bit_depth=queue_item.streamdetails.audio_format.bit_depth, ) async for chunk in get_ffmpeg_stream( - audio_input=get_media_stream( - self.mass, + audio_input=self._get_media_stream( streamdetails=queue_item.streamdetails, pcm_format=pcm_format, ), @@ -830,8 +835,7 @@ class StreamsController(CoreController): bytes_written = 0 buffer = b"" # handle incoming audio chunks - async for chunk in get_media_stream( - self.mass, + async for chunk in self._get_media_stream( queue_track.streamdetails, pcm_format=pcm_format, # strip silence from begin/end if track is being crossfaded @@ -940,6 +944,203 @@ class StreamsController(CoreController): ): yield chunk + async def _get_media_stream( + self, + streamdetails: StreamDetails, + pcm_format: AudioFormat, + strip_silence_begin: bool = False, + strip_silence_end: bool = False, + ) -> AsyncGenerator[tuple[bool, bytes], None]: + """ + Get the (raw PCM) audio stream for the given streamdetails. + + Other than stripping silence at end and beginning and optional + volume normalization this is the pure, unaltered audio data as PCM chunks. + """ + logger = self.logger.getChild("media_stream") + is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration + if is_radio or streamdetails.seek_position: + strip_silence_begin = False + # chunk size = 2 seconds of pcm audio + pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2) + chunk_size = pcm_sample_size * (1 if is_radio else 2) + expected_chunks = int((streamdetails.duration or 0) / 2) + if expected_chunks < 10: + strip_silence_end = False + + # collect all arguments for ffmpeg + filter_params = [] + extra_args = [] + seek_pos = ( + streamdetails.seek_position + if (streamdetails.direct or not streamdetails.can_seek) + else 0 + ) + if seek_pos: + # only use ffmpeg seeking if the provider stream does not support seeking + extra_args += ["-ss", str(seek_pos)] + if streamdetails.target_loudness is not None: + # add loudnorm filters + filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5" + if streamdetails.loudness: + filter_rule += f":measured_I={streamdetails.loudness.integrated}" + filter_rule += f":measured_LRA={streamdetails.loudness.lra}" + filter_rule += f":measured_tp={streamdetails.loudness.true_peak}" + filter_rule += f":measured_thresh={streamdetails.loudness.threshold}" + filter_rule += ":print_format=json" + filter_params.append(filter_rule) + if streamdetails.fade_in: + filter_params.append("afade=type=in:start_time=0:duration=3") + + if is_radio and streamdetails.direct and streamdetails.direct.startswith("http"): + # ensure we use the radio streamer for radio items + audio_source_iterator = get_radio_stream(self.mass, streamdetails.direct, streamdetails) + input_path = "-" + elif streamdetails.direct: + audio_source_iterator = None + input_path = streamdetails.direct + else: + audio_source_iterator = self.mass.get_provider(streamdetails.provider).get_audio_stream( + streamdetails, + seek_position=streamdetails.seek_position if streamdetails.can_seek else 0, + ) + input_path = "-" + + ffmpeg_args = get_ffmpeg_args( + input_format=streamdetails.audio_format, + output_format=pcm_format, + filter_params=filter_params, + extra_args=extra_args, + input_path=input_path, + loglevel="info", # needed for loudness measurement + ) + + async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]): + # To prevent stderr locking up, we must keep reading it + stderr_data = "" + async for line in ffmpeg_proc.iter_stderr(): + line = line.decode().strip() # noqa: PLW2901 + if not line: + continue + if stderr_data or "loudnorm" in line: + stderr_data += line + else: + self.logger.log(VERBOSE_LOG_LEVEL, line) + + # if we reach this point, the process is finished (finish or aborted) + if ffmpeg_proc.returncode == 0: + await state_data["finished"].wait() + finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set() + bytes_sent = state_data["bytes_sent"] + seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0 + streamdetails.seconds_streamed = seconds_streamed + state_str = "finished" if finished else "aborted" + logger.debug( + "stream %s for: %s (%s seconds streamed, exitcode %s)", + state_str, + streamdetails.uri, + seconds_streamed, + ffmpeg_proc.returncode, + ) + # store accurate duration + if finished: + streamdetails.duration = streamdetails.seek_position + seconds_streamed + + # parse loudnorm data if we have that collected + if stderr_data and (loudness_details := parse_loudnorm(stderr_data)): + required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 + if finished or (seconds_streamed >= required_seconds): + logger.debug( + "Loudness measurement for %s: %s", streamdetails.uri, loudness_details + ) + streamdetails.loudness = loudness_details + await self.mass.music.set_track_loudness( + streamdetails.item_id, streamdetails.provider, loudness_details + ) + + # report playback + if finished or seconds_streamed > 30: + self.mass.create_task( + self.mass.music.mark_item_played( + streamdetails.media_type, streamdetails.item_id, streamdetails.provider + ) + ) + if music_prov := self.mass.get_provider(streamdetails.provider): + self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) + + # cleanup + del state_data + del ffmpeg_proc + + async with AsyncProcess( + ffmpeg_args, + enable_stdin=audio_source_iterator is not None, + enable_stderr=True, + custom_stdin=audio_source_iterator, + name="ffmpeg_media_stream", + ) as ffmpeg_proc: + state_data = {"finished": asyncio.Event(), "bytes_sent": 0} + logger.debug("start media stream for: %s", streamdetails.uri) + + self.mass.create_task(log_reader(ffmpeg_proc, state_data)) + + # get pcm chunks from stdout + # we always stay one chunk behind to properly detect end of chunks + # so we can strip silence at the beginning and end of a track + prev_chunk = b"" + chunk_num = 0 + async for chunk in ffmpeg_proc.iter_chunked(chunk_size): + chunk_num += 1 + if strip_silence_begin and chunk_num == 2: + # first 2 chunks received, strip silence of beginning + stripped_audio = await strip_silence( + self.mass, + prev_chunk + chunk, + sample_rate=pcm_format.sample_rate, + bit_depth=pcm_format.bit_depth, + ) + yield stripped_audio + state_data["bytes_sent"] += len(stripped_audio) + prev_chunk = b"" + del stripped_audio + continue + if strip_silence_end and chunk_num >= (expected_chunks - 6): + # last part of the track, collect multiple chunks to strip silence later + prev_chunk += chunk + continue + + # middle part of the track, send previous chunk and collect current chunk + if prev_chunk: + yield prev_chunk + state_data["bytes_sent"] += len(prev_chunk) + + # collect this chunk for next round + prev_chunk = chunk + + # we did not receive any data, somethinh wet wrong + # raise here to prevent an endless loop elsewhere + if state_data["bytes_sent"] == 0: + raise AudioError("stream error on %s", streamdetails.uri) + + # all chunks received, strip silence of last part if needed and yield remaining bytes + if strip_silence_end and prev_chunk: + final_chunk = await strip_silence( + self.mass, + prev_chunk, + sample_rate=pcm_format.sample_rate, + bit_depth=pcm_format.bit_depth, + reverse=True, + ) + else: + final_chunk = prev_chunk + + # yield final chunk to output + yield final_chunk + state_data["bytes_sent"] += len(final_chunk) + state_data["finished"].set() + del final_chunk + del prev_chunk + def _log_request(self, request: web.Request) -> None: """Log request.""" if not self.logger.isEnabledFor(logging.DEBUG): diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index c6515755..17863538 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -7,7 +7,6 @@ import logging import os import re import struct -from contextlib import suppress from io import BytesIO from time import time from typing import TYPE_CHECKING @@ -26,7 +25,7 @@ from music_assistant.common.models.errors import ( MediaNotFoundError, MusicAssistantError, ) -from music_assistant.common.models.media_items import AudioFormat, ContentType, MediaType +from music_assistant.common.models.media_items import AudioFormat, ContentType from music_assistant.common.models.streamdetails import LoudnessMeasurement, StreamDetails from music_assistant.constants import ( CONF_EQ_BASS, @@ -38,7 +37,12 @@ from music_assistant.constants import ( ROOT_LOGGER_NAME, VERBOSE_LOG_LEVEL, ) -from music_assistant.server.helpers.playlists import fetch_playlist +from music_assistant.server.helpers.playlists import ( + HLS_CONTENT_TYPES, + IsHLSPlaylist, + fetch_playlist, + parse_m3u, +) from .process import AsyncProcess, check_output from .util import create_tempfile @@ -51,6 +55,10 @@ if TYPE_CHECKING: LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.audio") # pylint:disable=consider-using-f-string,too-many-locals,too-many-statements +# ruff: noqa: PLR0915 + +VLC_HEADERS = {"User-Agent": "VLC/3.0.2.LibVLC/3.0.2"} +VLC_HEADERS_ICY = {**VLC_HEADERS, "Icy-MetaData": "1"} async def crossfade_pcm_parts( @@ -258,13 +266,6 @@ async def get_stream_details( if not streamdetails.duration: streamdetails.duration = queue_item.duration - # make sure that ffmpeg handles mpeg dash streams directly - if ( - streamdetails.audio_format.content_type == ContentType.MPEG_DASH - and streamdetails.data - and streamdetails.data.startswith("http") - ): - streamdetails.direct = streamdetails.data return streamdetails @@ -322,209 +323,54 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration= return file.getvalue() -async def get_media_stream( # noqa: PLR0915 - mass: MusicAssistant, - streamdetails: StreamDetails, - pcm_format: AudioFormat, - strip_silence_begin: bool = False, - strip_silence_end: bool = False, -) -> AsyncGenerator[tuple[bool, bytes], None]: - """ - Get the (raw PCM) audio stream for the given streamdetails. - - Other than stripping silence at end and beginning and optional - volume normalization this is the pure, unaltered audio data as PCM chunks. - """ - bytes_sent = 0 - is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration - if is_radio or streamdetails.seek_position: - strip_silence_begin = False - # chunk size = 2 seconds of pcm audio - pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2) - chunk_size = pcm_sample_size * (1 if is_radio else 2) - expected_chunks = int((streamdetails.duration or 0) / 2) - if expected_chunks < 10: - strip_silence_end = False - - # collect all arguments for ffmpeg - filter_params = [] - extra_args = [] - seek_pos = ( - streamdetails.seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0 - ) - if seek_pos: - # only use ffmpeg seeking if the provider stream does not support seeking - extra_args += ["-ss", str(seek_pos)] - if streamdetails.target_loudness is not None: - # add loudnorm filters - filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5" - if streamdetails.loudness: - filter_rule += f":measured_I={streamdetails.loudness.integrated}" - filter_rule += f":measured_LRA={streamdetails.loudness.lra}" - filter_rule += f":measured_tp={streamdetails.loudness.true_peak}" - filter_rule += f":measured_thresh={streamdetails.loudness.threshold}" - filter_rule += ":print_format=json" - filter_params.append(filter_rule) - if streamdetails.fade_in: - filter_params.append("afade=type=in:start_time=0:duration=3") - ffmpeg_args = get_ffmpeg_args( - input_format=streamdetails.audio_format, - output_format=pcm_format, - filter_params=filter_params, - extra_args=extra_args, - input_path=streamdetails.direct or "-", - loglevel="info", # needed for loudness measurement - ) - - finished = False - - ffmpeg_proc = AsyncProcess( - ffmpeg_args, - enable_stdin=streamdetails.direct is None, - enable_stderr=True, - custom_stdin=mass.get_provider(streamdetails.provider).get_audio_stream( - streamdetails, - seek_position=streamdetails.seek_position if streamdetails.can_seek else 0, - ) - if not streamdetails.direct - else None, - name="ffmpeg_media_stream", - ) - await ffmpeg_proc.start() - logger = LOGGER.getChild("media_stream") - logger.debug("start media stream for: %s", streamdetails.uri) - - # get pcm chunks from stdout - # we always stay one chunk behind to properly detect end of chunks - # so we can strip silence at the beginning and end of a track - prev_chunk = b"" - chunk_num = 0 - try: - async for chunk in ffmpeg_proc.iter_chunked(chunk_size): - chunk_num += 1 - if strip_silence_begin and chunk_num == 2: - # first 2 chunks received, strip silence of beginning - stripped_audio = await strip_silence( - mass, - prev_chunk + chunk, - sample_rate=pcm_format.sample_rate, - bit_depth=pcm_format.bit_depth, - ) - yield stripped_audio - bytes_sent += len(stripped_audio) - prev_chunk = b"" - del stripped_audio - continue - if strip_silence_end and chunk_num >= (expected_chunks - 6): - # last part of the track, collect multiple chunks to strip silence later - prev_chunk += chunk - continue - - # middle part of the track, send previous chunk and collect current chunk - if prev_chunk: - yield prev_chunk - bytes_sent += len(prev_chunk) - prev_chunk = chunk - - # all chunks received, strip silence of last part if needed and yield remaining bytes - if strip_silence_end and prev_chunk: - final_chunk = await strip_silence( - mass, - prev_chunk, - sample_rate=pcm_format.sample_rate, - bit_depth=pcm_format.bit_depth, - reverse=True, - ) - else: - final_chunk = prev_chunk - yield final_chunk - bytes_sent += len(final_chunk) - del final_chunk - del prev_chunk - finished = True - finally: - seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0 - streamdetails.seconds_streamed = seconds_streamed - state_str = "finished" if finished else "aborted" - logger.debug( - "stream %s for: %s (%s seconds streamed)", - state_str, - streamdetails.uri, - seconds_streamed, - ) - # store accurate duration - if finished: - streamdetails.duration = streamdetails.seek_position + seconds_streamed - - # use communicate to read stderr and wait for exit - # read log for loudness measurement (or errors) - try: - _, stderr = await asyncio.wait_for(ffmpeg_proc.communicate(), 5) - except TimeoutError: - stderr = b"" - # ensure to send close here so we terminate and cleanup the process - await ffmpeg_proc.close() - if ffmpeg_proc.returncode != 0 and not bytes_sent: - logger.warning("stream error on %s", streamdetails.uri) - elif stderr and (loudness_details := _parse_loudnorm(stderr)): - required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 - if finished or (seconds_streamed >= required_seconds): - LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details) - streamdetails.loudness = loudness_details - await mass.music.set_track_loudness( - streamdetails.item_id, streamdetails.provider, loudness_details - ) - elif stderr: - logger.log(VERBOSE_LOG_LEVEL, stderr.decode()) - - # report playback - if finished or seconds_streamed > 30: - mass.create_task( - mass.music.mark_item_played( - streamdetails.media_type, streamdetails.item_id, streamdetails.provider - ) - ) - if music_prov := mass.get_provider(streamdetails.provider): - mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) - - -async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]: +async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]: """ Resolve a streaming radio URL. Unwraps any playlists if needed. Determines if the stream supports ICY metadata. - Returns unfolded URL and a bool if the URL supports ICY metadata. + Returns tuple; + - unfolded URL as string + - bool if the URL supports ICY metadata. + - bool uf the URL represents a HLS stream/playlist. """ - cache_key = f"resolved_radio_url_{url}" + base_url = url.split("?")[0] + cache_key = f"resolved_radio_{url}" if cache := await mass.cache.get(cache_key): return cache - # handle playlisted radio urls - is_mpeg_dash = False + is_hls = False supports_icy = False - if ".m3u" in url or ".pls" in url: - # url is playlist, try to figure out how to handle it - with suppress(InvalidDataError, IndexError): - playlist = await fetch_playlist(mass, url) - if len(playlist) > 1 or ".m3u" in playlist[0] or ".pls" in playlist[0]: - # if it is an mpeg-dash stream, let ffmpeg handle that - is_mpeg_dash = True - url = playlist[0] - if not is_mpeg_dash: - # determine ICY metadata support by looking at the http headers - headers = {"Icy-MetaData": "1", "User-Agent": "VLC/3.0.2.LibVLC/3.0.2"} - timeout = ClientTimeout(total=0, connect=10, sock_read=5) - try: - async with mass.http_session.head( - url, headers=headers, allow_redirects=True, timeout=timeout - ) as resp: - headers = resp.headers - supports_icy = int(headers.get("icy-metaint", "0")) > 0 - except ClientResponseError as err: - LOGGER.debug("Error while parsing radio URL %s: %s", url, err) - - result = (url, supports_icy) + resolved_url = url + timeout = ClientTimeout(total=0, connect=10, sock_read=5) + try: + async with mass.http_session.head( + url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout + ) as resp: + resolved_url = str(resp.real_url) + headers = resp.headers + supports_icy = int(headers.get("icy-metaint", "0")) > 0 + is_hls = headers.get("content-type") in HLS_CONTENT_TYPES + if ( + base_url.endswith((".m3u", ".m3u8", ".pls")) + or headers.get("content-type") == "audio/x-mpegurl" + ): + # url is playlist, we need to unfold it + try: + for line in await fetch_playlist(mass, resolved_url): + if not line.is_url: + continue + # unfold first url of playlist + return await resolve_radio_stream(mass, line.path) + raise InvalidDataError("No content found in playlist") + except IsHLSPlaylist: + is_hls = True + + except (ClientResponseError, InvalidDataError) as err: + LOGGER.warning("Error while parsing radio URL %s: %s", url, err) + return (resolved_url, supports_icy, is_hls) + + result = (resolved_url, supports_icy, is_hls) await mass.cache.set(cache_key, result, expiration=86400) return result @@ -533,51 +379,139 @@ async def get_radio_stream( mass: MusicAssistant, url: str, streamdetails: StreamDetails ) -> AsyncGenerator[bytes, None]: """Get radio audio stream from HTTP, including metadata retrieval.""" - headers = {"Icy-MetaData": "1", "User-Agent": "VLC/3.0.2.LibVLC/3.0.2"} - timeout = ClientTimeout(total=0, connect=30, sock_read=60) - retries = 5 - while retries: + resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url) + retries = 0 + while True: try: - async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp: - headers = resp.headers - meta_int = int(headers.get("icy-metaint", "0")) - # stream with ICY Metadata - if meta_int: - LOGGER.debug("Start streaming radio with ICY metadata from url %s", url) - while True: - try: - audio_chunk = await resp.content.readexactly(meta_int) - yield audio_chunk - meta_byte = await resp.content.readexactly(1) - meta_length = ord(meta_byte) * 16 - meta_data = await resp.content.readexactly(meta_length) - except asyncio.exceptions.IncompleteReadError: - break - if not meta_data: - continue - meta_data = meta_data.rstrip(b"\0") - stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data) - if not stream_title: - continue - stream_title = stream_title.group(1).decode() - if stream_title != streamdetails.stream_title: - streamdetails.stream_title = stream_title - # Regular HTTP stream - else: - LOGGER.debug("Start streaming radio without ICY metadata from url %s", url) - async for chunk in resp.content.iter_any(): - yield chunk - LOGGER.debug("Finished streaming radio from url %s", url) - except ClientError as err: - LOGGER.warning( - "Error while streaming radio %s: %s", - url, - str(err), - exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, - ) - if retries == 0: + retries += 1 + if is_hls: # special HLS stream + async for chunk in get_hls_stream(mass, resolved_url, streamdetails): + yield chunk + elif supports_icy: # http stream supports icy metadata + async for chunk in get_icy_stream(mass, resolved_url, streamdetails): + yield chunk + else: # generic http stream (without icy metadata) + async for chunk in get_http_stream(mass, resolved_url, streamdetails): + yield chunk + except ClientError: + LOGGER.warning("Streaming radio %s failed, retrying...", streamdetails.uri) + if retries >= 5: raise - retries -= 1 + await asyncio.sleep(1 * retries) + + +async def get_icy_stream( + mass: MusicAssistant, url: str, streamdetails: StreamDetails +) -> AsyncGenerator[bytes, None]: + """Get (radio) audio stream from HTTP, including ICY metadata retrieval.""" + timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) + LOGGER.debug("Start streaming radio with ICY metadata from url %s", url) + async with mass.http_session.get(url, headers=VLC_HEADERS_ICY, timeout=timeout) as resp: + headers = resp.headers + meta_int = int(headers["icy-metaint"]) + while True: + try: + audio_chunk = await resp.content.readexactly(meta_int) + yield audio_chunk + meta_byte = await resp.content.readexactly(1) + meta_length = ord(meta_byte) * 16 + meta_data = await resp.content.readexactly(meta_length) + except asyncio.exceptions.IncompleteReadError: + break + if not meta_data: + continue + meta_data = meta_data.rstrip(b"\0") + stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data) + if not stream_title: + continue + stream_title = stream_title.group(1).decode() + if stream_title != streamdetails.stream_title: + streamdetails.stream_title = stream_title + + +async def get_hls_stream( + mass: MusicAssistant, url: str, streamdetails: StreamDetails +) -> AsyncGenerator[bytes, None]: + """Get audio stream from HTTP HLS stream.""" + timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) + # fetch master playlist and select (best) child playlist + # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10 + async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp: + charset = resp.charset or "utf-8" + master_m3u_data = await resp.text(charset) + substreams = parse_m3u(master_m3u_data) + if any(x for x in substreams if x.path.endswith(".ts")) or not all( + x for x in substreams if x.stream_info is not None + ): + # the url we got is already a substream + substream_url = url + else: + # sort substreams on best quality (highest bandwidth) + substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True) + substream = substreams[0] + substream_url = substream.path + if not substream_url.startswith("http"): + # path is relative, stitch it together + base_path = url.rsplit("/", 1)[0] + substream_url = base_path + "/" + substream.path + + async def watch_metadata(): + # ffmpeg is not (yet?) able to handle metadata updates that is provided + # in the substream playlist and/or the ID3 metadata + # so we do that here in a separate task. + # this also gets the basic + prev_chunk = "" + while True: + async with mass.http_session.get( + substream_url, headers=VLC_HEADERS, timeout=timeout + ) as resp: + charset = resp.charset or "utf-8" + substream_m3u_data = await resp.text(charset) + # get chunk-parts from the substream + hls_chunks = parse_m3u(substream_m3u_data) + metadata_found = False + for chunk_item in hls_chunks: + if chunk_item.path == prev_chunk: + continue + chunk_item_url = chunk_item.path + if not chunk_item_url.startswith("http"): + # path is relative, stitch it together + base_path = substream_url.rsplit("/", 1)[0] + chunk_item_url = base_path + "/" + chunk_item.path + if chunk_item.title and chunk_item.title != "no desc": + streamdetails.stream_title = chunk_item.title + metadata_found = True + # prevent that we play this chunk again if we loop through + prev_chunk = chunk_item.path + if chunk_item.length and chunk_item.length.isnumeric(): + await asyncio.sleep(int(chunk_item.length)) + else: + await asyncio.sleep(5) + if not metadata_found: + # this station does not provide metadata embedded in the HLS playlist + return + + LOGGER.debug( + "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url + ) + + input_format = streamdetails.audio_format + output_format = streamdetails.audio_format + if streamdetails.audio_format.content_type == ContentType.UNKNOWN: + streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC) + output_format = AudioFormat(content_type=ContentType.FLAC) + + try: + metadata_task = asyncio.create_task(watch_metadata()) + async for chunk in get_ffmpeg_stream( + audio_input=substream_url, + input_format=input_format, + output_format=output_format, + ): + yield chunk + finally: + if metadata_task and not metadata_task.done(): + metadata_task.cancel() async def get_http_stream( @@ -587,6 +521,7 @@ async def get_http_stream( seek_position: int = 0, ) -> AsyncGenerator[bytes, None]: """Get audio stream from HTTP.""" + LOGGER.debug("Start HTTP stream for %s (seek_position %s)", streamdetails.uri, seek_position) if seek_position: assert streamdetails.duration, "Duration required for seek requests" # try to get filesize with a head request @@ -606,7 +541,7 @@ async def get_http_stream( buffer_all = False bytes_received = 0 timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) - async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp: + async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp: is_partial = resp.status == 206 buffer_all = seek_position and not is_partial async for chunk in resp.content.iter_any(): @@ -624,6 +559,12 @@ async def get_http_stream( if buffer_all: skip_bytes = streamdetails.size / streamdetails.duration * seek_position yield buffer[:skip_bytes] + LOGGER.debug( + "Finished HTTP stream for %s (transferred %s/%s bytes)", + streamdetails.uri, + bytes_received, + streamdetails.size, + ) async def get_file_stream( @@ -658,6 +599,7 @@ async def get_ffmpeg_stream( filter_params: list[str] | None = None, extra_args: list[str] | None = None, chunk_size: int | None = None, + loglevel: str | None = None, ) -> AsyncGenerator[bytes, None]: """ Get the ffmpeg audio stream as async generator. @@ -665,6 +607,8 @@ async def get_ffmpeg_stream( Takes care of resampling and/or recoding if needed, according to player preferences. """ + if loglevel is None: + loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet" use_stdin = not isinstance(audio_input, str) ffmpeg_args = get_ffmpeg_args( input_format=input_format, @@ -673,7 +617,7 @@ async def get_ffmpeg_stream( extra_args=extra_args or [], input_path="-" if use_stdin else audio_input, output_path="-", - loglevel="info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet", + loglevel=loglevel, ) async with AsyncProcess( ffmpeg_args, @@ -943,13 +887,14 @@ def get_ffmpeg_args( return generic_args + input_args + extra_args + output_args -def _parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None: +def parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None: """Parse Loudness measurement from ffmpeg stderr output.""" stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr if "[Parsed_loudnorm_" not in stderr_data: return None stderr_data = stderr_data.split("[Parsed_loudnorm_")[1] stderr_data = stderr_data.rsplit("]")[-1].strip() + stderr_data = stderr_data.rsplit("}")[0].strip() + "}" try: loudness_data = json_loads(stderr_data) except JSON_DECODE_EXCEPTIONS: diff --git a/music_assistant/server/helpers/playlists.py b/music_assistant/server/helpers/playlists.py index a8c2009c..34052dbf 100644 --- a/music_assistant/server/helpers/playlists.py +++ b/music_assistant/server/helpers/playlists.py @@ -1,11 +1,14 @@ -"""Helpers for parsing playlists.""" +"""Helpers for parsing (online and offline) playlists.""" from __future__ import annotations +import configparser import logging +from dataclasses import dataclass from typing import TYPE_CHECKING +from urllib.parse import urlparse -import aiohttp +from aiohttp import client_exceptions from music_assistant.common.models.errors import InvalidDataError @@ -14,41 +17,118 @@ if TYPE_CHECKING: LOGGER = logging.getLogger(__name__) +HLS_CONTENT_TYPES = ( + # https://tools.ietf.org/html/draft-pantos-http-live-streaming-19#section-10 + "application/vnd.apple.mpegurl", + # Additional informal types used by Mozilla gecko not included as they + # don't reliably indicate HLS streams +) -async def parse_m3u(m3u_data: str) -> list[str]: - """Parse (only) filenames/urls from m3u playlist file.""" +class IsHLSPlaylist(InvalidDataError): + """The playlist from an HLS stream and should not be parsed.""" + + +@dataclass +class PlaylistItem: + """Playlist item.""" + + path: str + length: str | None = None + title: str | None = None + stream_info: dict[str, str] | None = None + + @property + def is_url(self) -> bool: + """Validate the URL can be parsed and at least has scheme + netloc.""" + result = urlparse(self.path) + return all([result.scheme, result.netloc]) + + +def parse_m3u(m3u_data: str) -> list[PlaylistItem]: + """Very simple m3u parser. + + Based on https://github.com/dvndrsn/M3uParser/blob/master/m3uparser.py + """ + # From Mozilla gecko source: https://github.com/mozilla/gecko-dev/blob/c4c1adbae87bf2d128c39832d72498550ee1b4b8/dom/media/DecoderTraits.cpp#L47-L52 + m3u_lines = m3u_data.splitlines() - lines = [] + + playlist = [] + + length = None + title = None + stream_info = None + for line in m3u_lines: line = line.strip() # noqa: PLW2901 - if line.startswith("#"): - # ignore metadata + if line.startswith("#EXTINF:"): + # Get length and title from #EXTINF line + info = line.split("#EXTINF:")[1].split(",", 1) + if len(info) != 2: + continue + length = info[0].strip()[0] + title = info[1].strip() + elif line.startswith("#EXT-X-STREAM-INF:"): + # HLS stream properties + # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10 + stream_info = {} + for part in line.replace("#EXT-X-STREAM-INF:", "").split(","): + if "=" not in part: + continue + kev_value_parts = part.strip().split("=") + stream_info[kev_value_parts[0]] = kev_value_parts[1] + elif line.startswith("#"): + # Ignore other extensions continue - if len(line) != 0: - # Get uri/path from all other, non-blank lines - lines.append(line) + elif len(line) != 0: + # Get song path from all other, non-blank lines + playlist.append( + PlaylistItem(path=line, length=length, title=title, stream_info=stream_info) + ) + # reset the song variables so it doesn't use the same EXTINF more than once + length = None + title = None + stream_info = None - return lines + return playlist -async def parse_pls(pls_data: str) -> list[str]: +def parse_pls(pls_data: str) -> list[PlaylistItem]: """Parse (only) filenames/urls from pls playlist file.""" - pls_lines = pls_data.splitlines() - lines = [] - for line in pls_lines: - line = line.strip() # noqa: PLW2901 - if not line.startswith("File"): - # ignore metadata lines - continue - if "=" in line: - # Get uri/path from all other, non-blank lines - lines.append(line.split("=")[1]) + pls_parser = configparser.ConfigParser() + try: + pls_parser.read_string(pls_data, "playlist") + except configparser.Error as err: + raise InvalidDataError("Can't parse playlist") from err + + if "playlist" not in pls_parser or pls_parser["playlist"].getint("Version") != 2: + raise InvalidDataError("Invalid playlist") - return lines + try: + num_entries = pls_parser.getint("playlist", "NumberOfEntries") + except (configparser.NoOptionError, ValueError) as err: + raise InvalidDataError("Invalid NumberOfEntries in playlist") from err + playlist_section = pls_parser["playlist"] -async def fetch_playlist(mass: MusicAssistant, url: str) -> list[str]: + playlist = [] + for entry in range(1, num_entries + 1): + file_option = f"File{entry}" + if file_option not in playlist_section: + continue + itempath = playlist_section[file_option] + playlist.append( + PlaylistItem( + length=playlist_section.get(f"Length{entry}"), + title=playlist_section.get(f"Title{entry}"), + path=itempath, + ) + ) + return playlist + + +async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]: """Parse an online m3u or pls playlist.""" try: async with mass.http_session.get(url, timeout=5) as resp: @@ -61,14 +141,17 @@ async def fetch_playlist(mass: MusicAssistant, url: str) -> list[str]: except TimeoutError as err: msg = f"Timeout while fetching playlist {url}" raise InvalidDataError(msg) from err - except aiohttp.client_exceptions.ClientError as err: + except client_exceptions.ClientError as err: msg = f"Error while fetching playlist {url}" raise InvalidDataError(msg) from err + if "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data: + raise IsHLSPlaylist + if url.endswith((".m3u", ".m3u8")): - playlist = await parse_m3u(playlist_data) + playlist = parse_m3u(playlist_data) else: - playlist = await parse_pls(playlist_data) + playlist = parse_pls(playlist_data) if not playlist: msg = f"Empty playlist {url}" diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index c529bc12..fba1d477 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -60,6 +60,7 @@ class AsyncProcess: self._custom_stdin = None self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin))) self._custom_stdout = custom_stdout + self._stderr_locked = asyncio.Lock() @property def closed(self) -> bool: @@ -166,6 +167,7 @@ class AsyncProcess: task.cancel() with suppress(asyncio.CancelledError): await task + if self.proc.returncode is None: # always first try to send sigint signal to try clean shutdown # for example ffmpeg needs this to cleanly shutdown and not lock on pipes @@ -179,8 +181,13 @@ class AsyncProcess: # especially with pipes this can cause deadlocks if not properly guarded # we need to use communicate to ensure buffers are flushed # we do that with sending communicate + if self._enable_stdin and not self.proc.stdin.is_closing(): + self.proc.stdin.close() try: - await asyncio.wait_for(self.proc.communicate(), 2) + if self.proc.stdout and self._stderr_locked.locked(): + await asyncio.wait_for(self.proc.stdout.read(), 5) + else: + await asyncio.wait_for(self.proc.communicate(), 5) except TimeoutError: LOGGER.debug( "Process %s with PID %s did not stop in time. Sending terminate...", @@ -207,10 +214,11 @@ class AsyncProcess: stdout, stderr = await self.proc.communicate(input_data) return (stdout, stderr) - async def read_stderr(self) -> AsyncGenerator[bytes, None]: - """Read lines from the stderr stream.""" - async for line in self.proc.stderr: - yield line + async def iter_stderr(self) -> AsyncGenerator[bytes, None]: + """Iterate lines from the stderr stream.""" + async with self._stderr_locked: + async for line in self.proc.stderr: + yield line async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None: """Feed stdin with chunks from an AsyncGenerator.""" diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 19908793..b1ced2d8 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -344,7 +344,7 @@ class AirplayStream: lost_packets = 0 prev_metadata_checksum: str = "" prev_progress_report: float = 0 - async for line in self._cliraop_proc.read_stderr(): + async for line in self._cliraop_proc.iter_stderr(): line = line.decode().strip() # noqa: PLW2901 if not line: continue diff --git a/music_assistant/server/providers/filesystem_local/base.py b/music_assistant/server/providers/filesystem_local/base.py index 974c7cd7..e29735f0 100644 --- a/music_assistant/server/providers/filesystem_local/base.py +++ b/music_assistant/server/providers/filesystem_local/base.py @@ -493,13 +493,13 @@ class FileSystemProviderBase(MusicProvider): playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8") if ext in ("m3u", "m3u8"): - playlist_lines = await parse_m3u(playlist_data) + playlist_lines = parse_m3u(playlist_data) else: - playlist_lines = await parse_pls(playlist_data) + playlist_lines = parse_pls(playlist_data) - for line_no, playlist_line in enumerate(playlist_lines, 1): + for line_no, playlist_line in enumerate(playlist_lines, 0): if media_item := await self._parse_playlist_line( - playlist_line, os.path.dirname(prov_playlist_id), line_no + playlist_line.path, os.path.dirname(prov_playlist_id), line_no ): yield media_item @@ -564,7 +564,6 @@ class FileSystemProviderBase(MusicProvider): if not await self.exists(prov_playlist_id): msg = f"Playlist path does not exist: {prov_playlist_id}" raise MediaNotFoundError(msg) - cur_lines = [] _, ext = prov_playlist_id.rsplit(".", 1) # get playlist file contents @@ -573,18 +572,20 @@ class FileSystemProviderBase(MusicProvider): playlist_data += chunk encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data) playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8") - + # get current contents first if ext in ("m3u", "m3u8"): - playlist_lines = await parse_m3u(playlist_data) + playlist_items = parse_m3u(playlist_data) else: - playlist_lines = await parse_pls(playlist_data) - - for line_no, playlist_line in enumerate(playlist_lines, 1): - if line_no not in positions_to_remove: - cur_lines.append(playlist_line) - - new_playlist_data = "\n".join(cur_lines) - # write playlist file (always in utf-8) + playlist_items = parse_pls(playlist_data) + # remove items by index + for i in sorted(positions_to_remove, reverse=True): + del playlist_items[i] + + # build new playlist data + new_playlist_data = "#EXTM3U\n" + for item in playlist_items: + new_playlist_data.append(f"#EXTINF:{item.length or 0},{item.title}\n") + new_playlist_data.append(f"{item.path}\n") await self.write_file_content(prov_playlist_id, new_playlist_data.encode("utf-8")) async def create_playlist(self, name: str) -> Playlist: @@ -593,7 +594,7 @@ class FileSystemProviderBase(MusicProvider): # as creating a new (empty) file with the m3u extension... # filename = await self.resolve(f"{name}.m3u") filename = f"{name}.m3u" - await self.write_file_content(filename, b"") + await self.write_file_content(filename, b"#EXTM3U\n") return await self.get_playlist(filename) async def get_stream_details(self, item_id: str) -> StreamDetails: diff --git a/music_assistant/server/providers/radiobrowser/__init__.py b/music_assistant/server/providers/radiobrowser/__init__.py index 1ffc8992..2f6f11fd 100644 --- a/music_assistant/server/providers/radiobrowser/__init__.py +++ b/music_assistant/server/providers/radiobrowser/__init__.py @@ -22,7 +22,6 @@ from music_assistant.common.models.media_items import ( SearchResults, ) from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.server.helpers.audio import get_radio_stream, resolve_radio_stream from music_assistant.server.models.music_provider import MusicProvider SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE) @@ -283,7 +282,6 @@ class RadioBrowserProvider(MusicProvider): """Get streamdetails for a radio station.""" stream = await self.radios.station(uuid=item_id) await self.radios.station_click(uuid=item_id) - url_resolved, supports_icy = await resolve_radio_stream(self.mass, stream.url_resolved) return StreamDetails( provider=self.domain, item_id=item_id, @@ -291,16 +289,6 @@ class RadioBrowserProvider(MusicProvider): content_type=ContentType.try_parse(stream.codec), ), media_type=MediaType.RADIO, - data=url_resolved, - direct=url_resolved if not supports_icy else None, - expires=time() + 24 * 3600, + direct=stream.url_resolved, + expires=time() + 3600, ) - - async def get_audio_stream( - self, - streamdetails: StreamDetails, - seek_position: int = 0, - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails): - yield chunk diff --git a/music_assistant/server/providers/tunein/__init__.py b/music_assistant/server/providers/tunein/__init__.py index 9bd8ff0e..b3e3242e 100644 --- a/music_assistant/server/providers/tunein/__init__.py +++ b/music_assistant/server/providers/tunein/__init__.py @@ -22,7 +22,6 @@ from music_assistant.common.models.media_items import ( ) from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import CONF_USERNAME -from music_assistant.server.helpers.audio import get_radio_stream, resolve_radio_stream from music_assistant.server.helpers.tags import parse_tags from music_assistant.server.models.music_provider import MusicProvider @@ -227,15 +226,14 @@ class TuneInProvider(MusicProvider): content_type=ContentType.UNKNOWN, ), media_type=MediaType.RADIO, - data=item_id, + direct=item_id, + expires=time() + 3600, ) stream_item_id, media_type = item_id.split("--", 1) stream_info = await self.__get_data("Tune.ashx", id=stream_item_id) for stream in stream_info["body"]: if stream["media_type"] != media_type: continue - # check if the radio stream is not a playlist - url_resolved, supports_icy = await resolve_radio_stream(self.mass, stream["url"]) return StreamDetails( provider=self.domain, item_id=item_id, @@ -243,22 +241,12 @@ class TuneInProvider(MusicProvider): content_type=ContentType(stream["media_type"]), ), media_type=MediaType.RADIO, - data=url_resolved, - expires=time() + 24 * 3600, - direct=url_resolved if not supports_icy else None, + direct=stream["url"], + expires=time() + 3600, ) msg = f"Unable to retrieve stream details for {item_id}" raise MediaNotFoundError(msg) - async def get_audio_stream( - self, - streamdetails: StreamDetails, - seek_position: int = 0, - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails): - yield chunk - async def __get_data(self, endpoint: str, **kwargs): """Get data from api.""" if endpoint.startswith("http"): diff --git a/music_assistant/server/providers/url/__init__.py b/music_assistant/server/providers/url/__init__.py index e83886be..c3ec5dd7 100644 --- a/music_assistant/server/providers/url/__init__.py +++ b/music_assistant/server/providers/url/__init__.py @@ -6,6 +6,7 @@ import os from typing import TYPE_CHECKING from music_assistant.common.models.enums import ContentType, ImageType, MediaType +from music_assistant.common.models.errors import MediaNotFoundError from music_assistant.common.models.media_items import ( Artist, AudioFormat, @@ -22,7 +23,6 @@ from music_assistant.server.helpers.audio import ( get_radio_stream, resolve_radio_stream, ) -from music_assistant.server.helpers.playlists import fetch_playlist from music_assistant.server.helpers.tags import AudioTags, parse_tags from music_assistant.server.models.music_provider import MusicProvider @@ -75,11 +75,25 @@ class URLProvider(MusicProvider): async def get_track(self, prov_track_id: str) -> Track: """Get full track details by id.""" + # always prefer db item for existing items to not overwrite user customizations + db_item = await self.mass.music.tracks.get_library_item_by_prov_id( + prov_track_id, self.instance_id + ) + if db_item is None and not prov_track_id.startswith("http"): + msg = f"Track not found: {prov_track_id}" + raise MediaNotFoundError(msg) return await self.parse_item(prov_track_id) async def get_radio(self, prov_radio_id: str) -> Radio: """Get full radio details by id.""" - return await self.parse_item(prov_radio_id, force_radio=True) + # always prefer db item for existing items to not overwrite user customizations + db_item = await self.mass.music.radio.get_library_item_by_prov_id( + prov_radio_id, self.instance_id + ) + if db_item is None and not prov_radio_id.startswith("http"): + msg = f"Radio not found: {prov_radio_id}" + raise MediaNotFoundError(msg) + return await self.parse_item(prov_radio_id) async def get_artist(self, prov_artist_id: str) -> Track: """Get full artist details by id.""" @@ -113,16 +127,16 @@ class URLProvider(MusicProvider): async def parse_item( self, - item_id_or_url: str, + url: str, force_refresh: bool = False, force_radio: bool = False, ) -> Track | Radio: """Parse plain URL to MediaItem of type Radio or Track.""" - item_id, url, media_info = await self._get_media_info(item_id_or_url, force_refresh) + media_info = await self._get_media_info(url, force_refresh) is_radio = media_info.get("icy-name") or not media_info.duration provider_mappings = { ProviderMapping( - item_id=item_id, + item_id=url, provider_domain=self.domain, provider_instance=self.instance_id, audio_format=AudioFormat( @@ -136,14 +150,14 @@ class URLProvider(MusicProvider): if is_radio or force_radio: # treat as radio media_item = Radio( - item_id=item_id, + item_id=url, provider=self.domain, name=media_info.get("icy-name") or media_info.title, provider_mappings=provider_mappings, ) else: media_item = Track( - item_id=item_id, + item_id=url, provider=self.domain, name=media_info.title, duration=int(media_info.duration or 0), @@ -157,38 +171,25 @@ class URLProvider(MusicProvider): ] return media_item - async def _get_media_info( - self, item_id_or_url: str, force_refresh: bool = False - ) -> tuple[str, str, AudioTags]: - """Retrieve (cached) mediainfo for url.""" - # check if the radio stream is not a playlist - if item_id_or_url.endswith(("m3u8", "m3u", "pls")): - playlist = await fetch_playlist(self.mass, item_id_or_url) - url = playlist[0] - item_id = item_id_or_url - self._full_url[item_id] = url - else: - url = self._full_url.get(item_id_or_url, item_id_or_url) - item_id = item_id_or_url - cache_key = f"{self.instance_id}.media_info.{item_id}" + async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags: + """Retrieve mediainfo for url.""" # do we have some cached info for this url ? + cache_key = f"{self.instance_id}.media_info.{url}" cached_info = await self.mass.cache.get(cache_key) if cached_info and not force_refresh: - media_info = AudioTags.parse(cached_info) - else: - # parse info with ffprobe (and store in cache) - media_info = await parse_tags(url) - if "authSig" in url: - media_info.has_cover_image = False - await self.mass.cache.set(cache_key, media_info.raw) - return (item_id, url, media_info) + return AudioTags.parse(cached_info) + # parse info with ffprobe (and store in cache) + resolved_url, _, _ = await resolve_radio_stream(self.mass, url) + media_info = await parse_tags(resolved_url) + if "authSig" in url: + media_info.has_cover_image = False + await self.mass.cache.set(cache_key, media_info.raw) + return media_info async def get_stream_details(self, item_id: str) -> StreamDetails: """Get streamdetails for a track/radio.""" - item_id, url, media_info = await self._get_media_info(item_id) + media_info = await self._get_media_info(item_id) is_radio = media_info.get("icy-name") or not media_info.duration - if is_radio: - url, supports_icy = await resolve_radio_stream(self.mass, url) return StreamDetails( provider=self.instance_id, item_id=item_id, @@ -198,8 +199,8 @@ class URLProvider(MusicProvider): bit_depth=media_info.bits_per_sample, ), media_type=MediaType.RADIO if is_radio else MediaType.TRACK, - direct=None if is_radio and supports_icy else url, - data=url, + direct=item_id if is_radio else None, + data=item_id, ) async def get_audio_stream( -- 2.34.1