Several improvements to playing radio streams (#1167)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 23 Mar 2024 20:55:57 +0000 (21:55 +0100)
committerGitHub <noreply@github.com>
Sat, 23 Mar 2024 20:55:57 +0000 (21:55 +0100)
music_assistant/common/models/enums.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/playlists.py
music_assistant/server/helpers/process.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/filesystem_local/base.py
music_assistant/server/providers/radiobrowser/__init__.py
music_assistant/server/providers/tunein/__init__.py
music_assistant/server/providers/url/__init__.py

index 9753c3f66cdf4613ad6a78d924d5a85b7fa2c822..fae363c790919d0faf29008e399cba586f3ece50 100644 (file)
@@ -137,7 +137,6 @@ class ContentType(StrEnum):
     PCM_F32LE = "f32le"  # PCM 32-bit floating-point little-endian
     PCM_F64LE = "f64le"  # PCM 64-bit floating-point little-endian
     PCM = "pcm"  # PCM generic (details determined later)
-    MPEG_DASH = "dash"
     UNKNOWN = "?"
 
     @classmethod
index 6dc0f4d34217588ad03d3ba733c86fb2638b13e4..9174f962252c155a996b012a0f01d1878ca3bbf5 100644 (file)
@@ -14,7 +14,7 @@ import time
 import urllib.parse
 from collections.abc import AsyncGenerator
 from contextlib import suppress
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 
 import shortuuid
 from aiohttp import web
@@ -26,8 +26,9 @@ from music_assistant.common.models.config_entries import (
     ConfigValueType,
 )
 from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType
-from music_assistant.common.models.errors import QueueEmpty
+from music_assistant.common.models.errors import AudioError, QueueEmpty
 from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import (
     ANNOUNCE_ALERT_FILE,
     CONF_BIND_IP,
@@ -38,15 +39,20 @@ from music_assistant.constants import (
     CONF_PUBLISH_IP,
     SILENCE_FILE,
     UGP_PREFIX,
+    VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
+    get_ffmpeg_args,
     get_ffmpeg_stream,
-    get_media_stream,
     get_player_filter_params,
+    get_radio_stream,
+    parse_loudnorm,
+    strip_silence,
 )
+from music_assistant.server.helpers.process import AsyncProcess
 from music_assistant.server.helpers.util import get_ips
 from music_assistant.server.helpers.webserver import Webserver
 from music_assistant.server.models.core_controller import CoreController
@@ -519,8 +525,7 @@ class StreamsController(CoreController):
             bit_depth=queue_item.streamdetails.audio_format.bit_depth,
         )
         async for chunk in get_ffmpeg_stream(
-            audio_input=get_media_stream(
-                self.mass,
+            audio_input=self._get_media_stream(
                 streamdetails=queue_item.streamdetails,
                 pcm_format=pcm_format,
             ),
@@ -830,8 +835,7 @@ class StreamsController(CoreController):
             bytes_written = 0
             buffer = b""
             # handle incoming audio chunks
-            async for chunk in get_media_stream(
-                self.mass,
+            async for chunk in self._get_media_stream(
                 queue_track.streamdetails,
                 pcm_format=pcm_format,
                 # strip silence from begin/end if track is being crossfaded
@@ -940,6 +944,203 @@ class StreamsController(CoreController):
         ):
             yield chunk
 
+    async def _get_media_stream(
+        self,
+        streamdetails: StreamDetails,
+        pcm_format: AudioFormat,
+        strip_silence_begin: bool = False,
+        strip_silence_end: bool = False,
+    ) -> AsyncGenerator[tuple[bool, bytes], None]:
+        """
+        Get the (raw PCM) audio stream for the given streamdetails.
+
+        Other than stripping silence at end and beginning and optional
+        volume normalization this is the pure, unaltered audio data as PCM chunks.
+        """
+        logger = self.logger.getChild("media_stream")
+        is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
+        if is_radio or streamdetails.seek_position:
+            strip_silence_begin = False
+        # chunk size = 2 seconds of pcm audio
+        pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
+        chunk_size = pcm_sample_size * (1 if is_radio else 2)
+        expected_chunks = int((streamdetails.duration or 0) / 2)
+        if expected_chunks < 10:
+            strip_silence_end = False
+
+        # collect all arguments for ffmpeg
+        filter_params = []
+        extra_args = []
+        seek_pos = (
+            streamdetails.seek_position
+            if (streamdetails.direct or not streamdetails.can_seek)
+            else 0
+        )
+        if seek_pos:
+            # only use ffmpeg seeking if the provider stream does not support seeking
+            extra_args += ["-ss", str(seek_pos)]
+        if streamdetails.target_loudness is not None:
+            # add loudnorm filters
+            filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
+            if streamdetails.loudness:
+                filter_rule += f":measured_I={streamdetails.loudness.integrated}"
+                filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
+                filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
+                filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
+            filter_rule += ":print_format=json"
+            filter_params.append(filter_rule)
+        if streamdetails.fade_in:
+            filter_params.append("afade=type=in:start_time=0:duration=3")
+
+        if is_radio and streamdetails.direct and streamdetails.direct.startswith("http"):
+            # ensure we use the radio streamer for radio items
+            audio_source_iterator = get_radio_stream(self.mass, streamdetails.direct, streamdetails)
+            input_path = "-"
+        elif streamdetails.direct:
+            audio_source_iterator = None
+            input_path = streamdetails.direct
+        else:
+            audio_source_iterator = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+                streamdetails,
+                seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
+            )
+            input_path = "-"
+
+        ffmpeg_args = get_ffmpeg_args(
+            input_format=streamdetails.audio_format,
+            output_format=pcm_format,
+            filter_params=filter_params,
+            extra_args=extra_args,
+            input_path=input_path,
+            loglevel="info",  # needed for loudness measurement
+        )
+
+        async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]):
+            # To prevent stderr locking up, we must keep reading it
+            stderr_data = ""
+            async for line in ffmpeg_proc.iter_stderr():
+                line = line.decode().strip()  # noqa: PLW2901
+                if not line:
+                    continue
+                if stderr_data or "loudnorm" in line:
+                    stderr_data += line
+                else:
+                    self.logger.log(VERBOSE_LOG_LEVEL, line)
+
+            # if we reach this point, the process is finished (finish or aborted)
+            if ffmpeg_proc.returncode == 0:
+                await state_data["finished"].wait()
+            finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set()
+            bytes_sent = state_data["bytes_sent"]
+            seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
+            streamdetails.seconds_streamed = seconds_streamed
+            state_str = "finished" if finished else "aborted"
+            logger.debug(
+                "stream %s for: %s (%s seconds streamed, exitcode %s)",
+                state_str,
+                streamdetails.uri,
+                seconds_streamed,
+                ffmpeg_proc.returncode,
+            )
+            # store accurate duration
+            if finished:
+                streamdetails.duration = streamdetails.seek_position + seconds_streamed
+
+            # parse loudnorm data if we have that collected
+            if stderr_data and (loudness_details := parse_loudnorm(stderr_data)):
+                required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+                if finished or (seconds_streamed >= required_seconds):
+                    logger.debug(
+                        "Loudness measurement for %s: %s", streamdetails.uri, loudness_details
+                    )
+                    streamdetails.loudness = loudness_details
+                    await self.mass.music.set_track_loudness(
+                        streamdetails.item_id, streamdetails.provider, loudness_details
+                    )
+
+            # report playback
+            if finished or seconds_streamed > 30:
+                self.mass.create_task(
+                    self.mass.music.mark_item_played(
+                        streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+                    )
+                )
+                if music_prov := self.mass.get_provider(streamdetails.provider):
+                    self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+
+            # cleanup
+            del state_data
+            del ffmpeg_proc
+
+        async with AsyncProcess(
+            ffmpeg_args,
+            enable_stdin=audio_source_iterator is not None,
+            enable_stderr=True,
+            custom_stdin=audio_source_iterator,
+            name="ffmpeg_media_stream",
+        ) as ffmpeg_proc:
+            state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
+            logger.debug("start media stream for: %s", streamdetails.uri)
+
+            self.mass.create_task(log_reader(ffmpeg_proc, state_data))
+
+            # get pcm chunks from stdout
+            # we always stay one chunk behind to properly detect end of chunks
+            # so we can strip silence at the beginning and end of a track
+            prev_chunk = b""
+            chunk_num = 0
+            async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+                chunk_num += 1
+                if strip_silence_begin and chunk_num == 2:
+                    # first 2 chunks received, strip silence of beginning
+                    stripped_audio = await strip_silence(
+                        self.mass,
+                        prev_chunk + chunk,
+                        sample_rate=pcm_format.sample_rate,
+                        bit_depth=pcm_format.bit_depth,
+                    )
+                    yield stripped_audio
+                    state_data["bytes_sent"] += len(stripped_audio)
+                    prev_chunk = b""
+                    del stripped_audio
+                    continue
+                if strip_silence_end and chunk_num >= (expected_chunks - 6):
+                    # last part of the track, collect multiple chunks to strip silence later
+                    prev_chunk += chunk
+                    continue
+
+                # middle part of the track, send previous chunk and collect current chunk
+                if prev_chunk:
+                    yield prev_chunk
+                    state_data["bytes_sent"] += len(prev_chunk)
+
+                # collect this chunk for next round
+                prev_chunk = chunk
+
+            # we did not receive any data, somethinh wet wrong
+            # raise here to prevent an endless loop elsewhere
+            if state_data["bytes_sent"] == 0:
+                raise AudioError("stream error on %s", streamdetails.uri)
+
+            # all chunks received, strip silence of last part if needed and yield remaining bytes
+            if strip_silence_end and prev_chunk:
+                final_chunk = await strip_silence(
+                    self.mass,
+                    prev_chunk,
+                    sample_rate=pcm_format.sample_rate,
+                    bit_depth=pcm_format.bit_depth,
+                    reverse=True,
+                )
+            else:
+                final_chunk = prev_chunk
+
+            # yield final chunk to output
+            yield final_chunk
+            state_data["bytes_sent"] += len(final_chunk)
+            state_data["finished"].set()
+            del final_chunk
+            del prev_chunk
+
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
         if not self.logger.isEnabledFor(logging.DEBUG):
index c6515755bfcca412d3c3d0b0124c7cb57791a4d8..178635388f915bd9f7f1e67b94112a82494da511 100644 (file)
@@ -7,7 +7,6 @@ import logging
 import os
 import re
 import struct
-from contextlib import suppress
 from io import BytesIO
 from time import time
 from typing import TYPE_CHECKING
@@ -26,7 +25,7 @@ from music_assistant.common.models.errors import (
     MediaNotFoundError,
     MusicAssistantError,
 )
-from music_assistant.common.models.media_items import AudioFormat, ContentType, MediaType
+from music_assistant.common.models.media_items import AudioFormat, ContentType
 from music_assistant.common.models.streamdetails import LoudnessMeasurement, StreamDetails
 from music_assistant.constants import (
     CONF_EQ_BASS,
@@ -38,7 +37,12 @@ from music_assistant.constants import (
     ROOT_LOGGER_NAME,
     VERBOSE_LOG_LEVEL,
 )
-from music_assistant.server.helpers.playlists import fetch_playlist
+from music_assistant.server.helpers.playlists import (
+    HLS_CONTENT_TYPES,
+    IsHLSPlaylist,
+    fetch_playlist,
+    parse_m3u,
+)
 
 from .process import AsyncProcess, check_output
 from .util import create_tempfile
@@ -51,6 +55,10 @@ if TYPE_CHECKING:
 
 LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.audio")
 # pylint:disable=consider-using-f-string,too-many-locals,too-many-statements
+# ruff: noqa: PLR0915
+
+VLC_HEADERS = {"User-Agent": "VLC/3.0.2.LibVLC/3.0.2"}
+VLC_HEADERS_ICY = {**VLC_HEADERS, "Icy-MetaData": "1"}
 
 
 async def crossfade_pcm_parts(
@@ -258,13 +266,6 @@ async def get_stream_details(
 
     if not streamdetails.duration:
         streamdetails.duration = queue_item.duration
-    # make sure that ffmpeg handles mpeg dash streams directly
-    if (
-        streamdetails.audio_format.content_type == ContentType.MPEG_DASH
-        and streamdetails.data
-        and streamdetails.data.startswith("http")
-    ):
-        streamdetails.direct = streamdetails.data
     return streamdetails
 
 
@@ -322,209 +323,54 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=
     return file.getvalue()
 
 
-async def get_media_stream(  # noqa: PLR0915
-    mass: MusicAssistant,
-    streamdetails: StreamDetails,
-    pcm_format: AudioFormat,
-    strip_silence_begin: bool = False,
-    strip_silence_end: bool = False,
-) -> AsyncGenerator[tuple[bool, bytes], None]:
-    """
-    Get the (raw PCM) audio stream for the given streamdetails.
-
-    Other than stripping silence at end and beginning and optional
-    volume normalization this is the pure, unaltered audio data as PCM chunks.
-    """
-    bytes_sent = 0
-    is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
-    if is_radio or streamdetails.seek_position:
-        strip_silence_begin = False
-    # chunk size = 2 seconds of pcm audio
-    pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
-    chunk_size = pcm_sample_size * (1 if is_radio else 2)
-    expected_chunks = int((streamdetails.duration or 0) / 2)
-    if expected_chunks < 10:
-        strip_silence_end = False
-
-    # collect all arguments for ffmpeg
-    filter_params = []
-    extra_args = []
-    seek_pos = (
-        streamdetails.seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0
-    )
-    if seek_pos:
-        # only use ffmpeg seeking if the provider stream does not support seeking
-        extra_args += ["-ss", str(seek_pos)]
-    if streamdetails.target_loudness is not None:
-        # add loudnorm filters
-        filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
-        if streamdetails.loudness:
-            filter_rule += f":measured_I={streamdetails.loudness.integrated}"
-            filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
-            filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
-            filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
-        filter_rule += ":print_format=json"
-        filter_params.append(filter_rule)
-    if streamdetails.fade_in:
-        filter_params.append("afade=type=in:start_time=0:duration=3")
-    ffmpeg_args = get_ffmpeg_args(
-        input_format=streamdetails.audio_format,
-        output_format=pcm_format,
-        filter_params=filter_params,
-        extra_args=extra_args,
-        input_path=streamdetails.direct or "-",
-        loglevel="info",  # needed for loudness measurement
-    )
-
-    finished = False
-
-    ffmpeg_proc = AsyncProcess(
-        ffmpeg_args,
-        enable_stdin=streamdetails.direct is None,
-        enable_stderr=True,
-        custom_stdin=mass.get_provider(streamdetails.provider).get_audio_stream(
-            streamdetails,
-            seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
-        )
-        if not streamdetails.direct
-        else None,
-        name="ffmpeg_media_stream",
-    )
-    await ffmpeg_proc.start()
-    logger = LOGGER.getChild("media_stream")
-    logger.debug("start media stream for: %s", streamdetails.uri)
-
-    # get pcm chunks from stdout
-    # we always stay one chunk behind to properly detect end of chunks
-    # so we can strip silence at the beginning and end of a track
-    prev_chunk = b""
-    chunk_num = 0
-    try:
-        async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
-            chunk_num += 1
-            if strip_silence_begin and chunk_num == 2:
-                # first 2 chunks received, strip silence of beginning
-                stripped_audio = await strip_silence(
-                    mass,
-                    prev_chunk + chunk,
-                    sample_rate=pcm_format.sample_rate,
-                    bit_depth=pcm_format.bit_depth,
-                )
-                yield stripped_audio
-                bytes_sent += len(stripped_audio)
-                prev_chunk = b""
-                del stripped_audio
-                continue
-            if strip_silence_end and chunk_num >= (expected_chunks - 6):
-                # last part of the track, collect multiple chunks to strip silence later
-                prev_chunk += chunk
-                continue
-
-            # middle part of the track, send previous chunk and collect current chunk
-            if prev_chunk:
-                yield prev_chunk
-                bytes_sent += len(prev_chunk)
-            prev_chunk = chunk
-
-        # all chunks received, strip silence of last part if needed and yield remaining bytes
-        if strip_silence_end and prev_chunk:
-            final_chunk = await strip_silence(
-                mass,
-                prev_chunk,
-                sample_rate=pcm_format.sample_rate,
-                bit_depth=pcm_format.bit_depth,
-                reverse=True,
-            )
-        else:
-            final_chunk = prev_chunk
-        yield final_chunk
-        bytes_sent += len(final_chunk)
-        del final_chunk
-        del prev_chunk
-        finished = True
-    finally:
-        seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
-        streamdetails.seconds_streamed = seconds_streamed
-        state_str = "finished" if finished else "aborted"
-        logger.debug(
-            "stream %s for: %s (%s seconds streamed)",
-            state_str,
-            streamdetails.uri,
-            seconds_streamed,
-        )
-        # store accurate duration
-        if finished:
-            streamdetails.duration = streamdetails.seek_position + seconds_streamed
-
-        # use communicate to read stderr and wait for exit
-        # read log for loudness measurement (or errors)
-        try:
-            _, stderr = await asyncio.wait_for(ffmpeg_proc.communicate(), 5)
-        except TimeoutError:
-            stderr = b""
-            # ensure to send close here so we terminate and cleanup the process
-            await ffmpeg_proc.close()
-        if ffmpeg_proc.returncode != 0 and not bytes_sent:
-            logger.warning("stream error on %s", streamdetails.uri)
-        elif stderr and (loudness_details := _parse_loudnorm(stderr)):
-            required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
-            if finished or (seconds_streamed >= required_seconds):
-                LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
-                streamdetails.loudness = loudness_details
-                await mass.music.set_track_loudness(
-                    streamdetails.item_id, streamdetails.provider, loudness_details
-                )
-        elif stderr:
-            logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
-
-        # report playback
-        if finished or seconds_streamed > 30:
-            mass.create_task(
-                mass.music.mark_item_played(
-                    streamdetails.media_type, streamdetails.item_id, streamdetails.provider
-                )
-            )
-            if music_prov := mass.get_provider(streamdetails.provider):
-                mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
-
-
-async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]:
+async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]:
     """
     Resolve a streaming radio URL.
 
     Unwraps any playlists if needed.
     Determines if the stream supports ICY metadata.
 
-    Returns unfolded URL and a bool if the URL supports ICY metadata.
+    Returns tuple;
+    - unfolded URL as string
+    - bool if the URL supports ICY metadata.
+    - bool uf the URL represents a HLS stream/playlist.
     """
-    cache_key = f"resolved_radio_url_{url}"
+    base_url = url.split("?")[0]
+    cache_key = f"resolved_radio_{url}"
     if cache := await mass.cache.get(cache_key):
         return cache
-    # handle playlisted radio urls
-    is_mpeg_dash = False
+    is_hls = False
     supports_icy = False
-    if ".m3u" in url or ".pls" in url:
-        # url is playlist, try to figure out how to handle it
-        with suppress(InvalidDataError, IndexError):
-            playlist = await fetch_playlist(mass, url)
-            if len(playlist) > 1 or ".m3u" in playlist[0] or ".pls" in playlist[0]:
-                # if it is an mpeg-dash stream, let ffmpeg handle that
-                is_mpeg_dash = True
-            url = playlist[0]
-    if not is_mpeg_dash:
-        # determine ICY metadata support by looking at the http headers
-        headers = {"Icy-MetaData": "1", "User-Agent": "VLC/3.0.2.LibVLC/3.0.2"}
-        timeout = ClientTimeout(total=0, connect=10, sock_read=5)
-        try:
-            async with mass.http_session.head(
-                url, headers=headers, allow_redirects=True, timeout=timeout
-            ) as resp:
-                headers = resp.headers
-                supports_icy = int(headers.get("icy-metaint", "0")) > 0
-        except ClientResponseError as err:
-            LOGGER.debug("Error while parsing radio URL %s: %s", url, err)
-
-    result = (url, supports_icy)
+    resolved_url = url
+    timeout = ClientTimeout(total=0, connect=10, sock_read=5)
+    try:
+        async with mass.http_session.head(
+            url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout
+        ) as resp:
+            resolved_url = str(resp.real_url)
+            headers = resp.headers
+        supports_icy = int(headers.get("icy-metaint", "0")) > 0
+        is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
+        if (
+            base_url.endswith((".m3u", ".m3u8", ".pls"))
+            or headers.get("content-type") == "audio/x-mpegurl"
+        ):
+            # url is playlist, we need to unfold it
+            try:
+                for line in await fetch_playlist(mass, resolved_url):
+                    if not line.is_url:
+                        continue
+                    # unfold first url of playlist
+                    return await resolve_radio_stream(mass, line.path)
+                raise InvalidDataError("No content found in playlist")
+            except IsHLSPlaylist:
+                is_hls = True
+
+    except (ClientResponseError, InvalidDataError) as err:
+        LOGGER.warning("Error while parsing radio URL %s: %s", url, err)
+        return (resolved_url, supports_icy, is_hls)
+
+    result = (resolved_url, supports_icy, is_hls)
     await mass.cache.set(cache_key, result, expiration=86400)
     return result
 
@@ -533,51 +379,139 @@ async def get_radio_stream(
     mass: MusicAssistant, url: str, streamdetails: StreamDetails
 ) -> AsyncGenerator[bytes, None]:
     """Get radio audio stream from HTTP, including metadata retrieval."""
-    headers = {"Icy-MetaData": "1", "User-Agent": "VLC/3.0.2.LibVLC/3.0.2"}
-    timeout = ClientTimeout(total=0, connect=30, sock_read=60)
-    retries = 5
-    while retries:
+    resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url)
+    retries = 0
+    while True:
         try:
-            async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
-                headers = resp.headers
-                meta_int = int(headers.get("icy-metaint", "0"))
-                # stream with ICY Metadata
-                if meta_int:
-                    LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
-                    while True:
-                        try:
-                            audio_chunk = await resp.content.readexactly(meta_int)
-                            yield audio_chunk
-                            meta_byte = await resp.content.readexactly(1)
-                            meta_length = ord(meta_byte) * 16
-                            meta_data = await resp.content.readexactly(meta_length)
-                        except asyncio.exceptions.IncompleteReadError:
-                            break
-                        if not meta_data:
-                            continue
-                        meta_data = meta_data.rstrip(b"\0")
-                        stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
-                        if not stream_title:
-                            continue
-                        stream_title = stream_title.group(1).decode()
-                        if stream_title != streamdetails.stream_title:
-                            streamdetails.stream_title = stream_title
-                # Regular HTTP stream
-                else:
-                    LOGGER.debug("Start streaming radio without ICY metadata from url %s", url)
-                    async for chunk in resp.content.iter_any():
-                        yield chunk
-                LOGGER.debug("Finished streaming radio from url %s", url)
-        except ClientError as err:
-            LOGGER.warning(
-                "Error while streaming radio %s: %s",
-                url,
-                str(err),
-                exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
-            )
-            if retries == 0:
+            retries += 1
+            if is_hls:  # special HLS stream
+                async for chunk in get_hls_stream(mass, resolved_url, streamdetails):
+                    yield chunk
+            elif supports_icy:  # http stream supports icy metadata
+                async for chunk in get_icy_stream(mass, resolved_url, streamdetails):
+                    yield chunk
+            else:  # generic http stream (without icy metadata)
+                async for chunk in get_http_stream(mass, resolved_url, streamdetails):
+                    yield chunk
+        except ClientError:
+            LOGGER.warning("Streaming radio %s failed, retrying...", streamdetails.uri)
+            if retries >= 5:
                 raise
-            retries -= 1
+            await asyncio.sleep(1 * retries)
+
+
+async def get_icy_stream(
+    mass: MusicAssistant, url: str, streamdetails: StreamDetails
+) -> AsyncGenerator[bytes, None]:
+    """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
+    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+    LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
+    async with mass.http_session.get(url, headers=VLC_HEADERS_ICY, timeout=timeout) as resp:
+        headers = resp.headers
+        meta_int = int(headers["icy-metaint"])
+        while True:
+            try:
+                audio_chunk = await resp.content.readexactly(meta_int)
+                yield audio_chunk
+                meta_byte = await resp.content.readexactly(1)
+                meta_length = ord(meta_byte) * 16
+                meta_data = await resp.content.readexactly(meta_length)
+            except asyncio.exceptions.IncompleteReadError:
+                break
+            if not meta_data:
+                continue
+            meta_data = meta_data.rstrip(b"\0")
+            stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
+            if not stream_title:
+                continue
+            stream_title = stream_title.group(1).decode()
+            if stream_title != streamdetails.stream_title:
+                streamdetails.stream_title = stream_title
+
+
+async def get_hls_stream(
+    mass: MusicAssistant, url: str, streamdetails: StreamDetails
+) -> AsyncGenerator[bytes, None]:
+    """Get audio stream from HTTP HLS stream."""
+    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+    # fetch master playlist and select (best) child playlist
+    # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
+    async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
+        charset = resp.charset or "utf-8"
+        master_m3u_data = await resp.text(charset)
+    substreams = parse_m3u(master_m3u_data)
+    if any(x for x in substreams if x.path.endswith(".ts")) or not all(
+        x for x in substreams if x.stream_info is not None
+    ):
+        # the url we got is already a substream
+        substream_url = url
+    else:
+        # sort substreams on best quality (highest bandwidth)
+        substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
+        substream = substreams[0]
+        substream_url = substream.path
+        if not substream_url.startswith("http"):
+            # path is relative, stitch it together
+            base_path = url.rsplit("/", 1)[0]
+            substream_url = base_path + "/" + substream.path
+
+    async def watch_metadata():
+        # ffmpeg is not (yet?) able to handle metadata updates that is provided
+        # in the substream playlist and/or the ID3 metadata
+        # so we do that here in a separate task.
+        # this also gets the basic
+        prev_chunk = ""
+        while True:
+            async with mass.http_session.get(
+                substream_url, headers=VLC_HEADERS, timeout=timeout
+            ) as resp:
+                charset = resp.charset or "utf-8"
+                substream_m3u_data = await resp.text(charset)
+            # get chunk-parts from the substream
+            hls_chunks = parse_m3u(substream_m3u_data)
+            metadata_found = False
+            for chunk_item in hls_chunks:
+                if chunk_item.path == prev_chunk:
+                    continue
+                chunk_item_url = chunk_item.path
+                if not chunk_item_url.startswith("http"):
+                    # path is relative, stitch it together
+                    base_path = substream_url.rsplit("/", 1)[0]
+                    chunk_item_url = base_path + "/" + chunk_item.path
+                if chunk_item.title and chunk_item.title != "no desc":
+                    streamdetails.stream_title = chunk_item.title
+                    metadata_found = True
+                # prevent that we play this chunk again if we loop through
+                prev_chunk = chunk_item.path
+                if chunk_item.length and chunk_item.length.isnumeric():
+                    await asyncio.sleep(int(chunk_item.length))
+                else:
+                    await asyncio.sleep(5)
+            if not metadata_found:
+                # this station does not provide metadata embedded in the HLS playlist
+                return
+
+    LOGGER.debug(
+        "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
+    )
+
+    input_format = streamdetails.audio_format
+    output_format = streamdetails.audio_format
+    if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
+        streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
+        output_format = AudioFormat(content_type=ContentType.FLAC)
+
+    try:
+        metadata_task = asyncio.create_task(watch_metadata())
+        async for chunk in get_ffmpeg_stream(
+            audio_input=substream_url,
+            input_format=input_format,
+            output_format=output_format,
+        ):
+            yield chunk
+    finally:
+        if metadata_task and not metadata_task.done():
+            metadata_task.cancel()
 
 
 async def get_http_stream(
@@ -587,6 +521,7 @@ async def get_http_stream(
     seek_position: int = 0,
 ) -> AsyncGenerator[bytes, None]:
     """Get audio stream from HTTP."""
+    LOGGER.debug("Start HTTP stream for %s (seek_position %s)", streamdetails.uri, seek_position)
     if seek_position:
         assert streamdetails.duration, "Duration required for seek requests"
     # try to get filesize with a head request
@@ -606,7 +541,7 @@ async def get_http_stream(
     buffer_all = False
     bytes_received = 0
     timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
-    async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
+    async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
         is_partial = resp.status == 206
         buffer_all = seek_position and not is_partial
         async for chunk in resp.content.iter_any():
@@ -624,6 +559,12 @@ async def get_http_stream(
     if buffer_all:
         skip_bytes = streamdetails.size / streamdetails.duration * seek_position
         yield buffer[:skip_bytes]
+    LOGGER.debug(
+        "Finished HTTP stream for %s (transferred %s/%s bytes)",
+        streamdetails.uri,
+        bytes_received,
+        streamdetails.size,
+    )
 
 
 async def get_file_stream(
@@ -658,6 +599,7 @@ async def get_ffmpeg_stream(
     filter_params: list[str] | None = None,
     extra_args: list[str] | None = None,
     chunk_size: int | None = None,
+    loglevel: str | None = None,
 ) -> AsyncGenerator[bytes, None]:
     """
     Get the ffmpeg audio stream as async generator.
@@ -665,6 +607,8 @@ async def get_ffmpeg_stream(
     Takes care of resampling and/or recoding if needed,
     according to player preferences.
     """
+    if loglevel is None:
+        loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet"
     use_stdin = not isinstance(audio_input, str)
     ffmpeg_args = get_ffmpeg_args(
         input_format=input_format,
@@ -673,7 +617,7 @@ async def get_ffmpeg_stream(
         extra_args=extra_args or [],
         input_path="-" if use_stdin else audio_input,
         output_path="-",
-        loglevel="info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
+        loglevel=loglevel,
     )
     async with AsyncProcess(
         ffmpeg_args,
@@ -943,13 +887,14 @@ def get_ffmpeg_args(
     return generic_args + input_args + extra_args + output_args
 
 
-def _parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None:
+def parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None:
     """Parse Loudness measurement from ffmpeg stderr output."""
     stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
     if "[Parsed_loudnorm_" not in stderr_data:
         return None
     stderr_data = stderr_data.split("[Parsed_loudnorm_")[1]
     stderr_data = stderr_data.rsplit("]")[-1].strip()
+    stderr_data = stderr_data.rsplit("}")[0].strip() + "}"
     try:
         loudness_data = json_loads(stderr_data)
     except JSON_DECODE_EXCEPTIONS:
index a8c2009c9a35cb12036d5950752c1180edf849dd..34052dbf27bde01f1ebd2e18cac7004c45deb009 100644 (file)
@@ -1,11 +1,14 @@
-"""Helpers for parsing playlists."""
+"""Helpers for parsing (online and offline) playlists."""
 
 from __future__ import annotations
 
+import configparser
 import logging
+from dataclasses import dataclass
 from typing import TYPE_CHECKING
+from urllib.parse import urlparse
 
-import aiohttp
+from aiohttp import client_exceptions
 
 from music_assistant.common.models.errors import InvalidDataError
 
@@ -14,41 +17,118 @@ if TYPE_CHECKING:
 
 
 LOGGER = logging.getLogger(__name__)
+HLS_CONTENT_TYPES = (
+    # https://tools.ietf.org/html/draft-pantos-http-live-streaming-19#section-10
+    "application/vnd.apple.mpegurl",
+    # Additional informal types used by Mozilla gecko not included as they
+    # don't reliably indicate HLS streams
+)
 
 
-async def parse_m3u(m3u_data: str) -> list[str]:
-    """Parse (only) filenames/urls from m3u playlist file."""
+class IsHLSPlaylist(InvalidDataError):
+    """The playlist from an HLS stream and should not be parsed."""
+
+
+@dataclass
+class PlaylistItem:
+    """Playlist item."""
+
+    path: str
+    length: str | None = None
+    title: str | None = None
+    stream_info: dict[str, str] | None = None
+
+    @property
+    def is_url(self) -> bool:
+        """Validate the URL can be parsed and at least has scheme + netloc."""
+        result = urlparse(self.path)
+        return all([result.scheme, result.netloc])
+
+
+def parse_m3u(m3u_data: str) -> list[PlaylistItem]:
+    """Very simple m3u parser.
+
+    Based on https://github.com/dvndrsn/M3uParser/blob/master/m3uparser.py
+    """
+    # From Mozilla gecko source: https://github.com/mozilla/gecko-dev/blob/c4c1adbae87bf2d128c39832d72498550ee1b4b8/dom/media/DecoderTraits.cpp#L47-L52
+
     m3u_lines = m3u_data.splitlines()
-    lines = []
+
+    playlist = []
+
+    length = None
+    title = None
+    stream_info = None
+
     for line in m3u_lines:
         line = line.strip()  # noqa: PLW2901
-        if line.startswith("#"):
-            # ignore metadata
+        if line.startswith("#EXTINF:"):
+            # Get length and title from #EXTINF line
+            info = line.split("#EXTINF:")[1].split(",", 1)
+            if len(info) != 2:
+                continue
+            length = info[0].strip()[0]
+            title = info[1].strip()
+        elif line.startswith("#EXT-X-STREAM-INF:"):
+            # HLS stream properties
+            # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
+            stream_info = {}
+            for part in line.replace("#EXT-X-STREAM-INF:", "").split(","):
+                if "=" not in part:
+                    continue
+                kev_value_parts = part.strip().split("=")
+                stream_info[kev_value_parts[0]] = kev_value_parts[1]
+        elif line.startswith("#"):
+            # Ignore other extensions
             continue
-        if len(line) != 0:
-            # Get uri/path from all other, non-blank lines
-            lines.append(line)
+        elif len(line) != 0:
+            # Get song path from all other, non-blank lines
+            playlist.append(
+                PlaylistItem(path=line, length=length, title=title, stream_info=stream_info)
+            )
+            # reset the song variables so it doesn't use the same EXTINF more than once
+            length = None
+            title = None
+            stream_info = None
 
-    return lines
+    return playlist
 
 
-async def parse_pls(pls_data: str) -> list[str]:
+def parse_pls(pls_data: str) -> list[PlaylistItem]:
     """Parse (only) filenames/urls from pls playlist file."""
-    pls_lines = pls_data.splitlines()
-    lines = []
-    for line in pls_lines:
-        line = line.strip()  # noqa: PLW2901
-        if not line.startswith("File"):
-            # ignore metadata lines
-            continue
-        if "=" in line:
-            # Get uri/path from all other, non-blank lines
-            lines.append(line.split("=")[1])
+    pls_parser = configparser.ConfigParser()
+    try:
+        pls_parser.read_string(pls_data, "playlist")
+    except configparser.Error as err:
+        raise InvalidDataError("Can't parse playlist") from err
+
+    if "playlist" not in pls_parser or pls_parser["playlist"].getint("Version") != 2:
+        raise InvalidDataError("Invalid playlist")
 
-    return lines
+    try:
+        num_entries = pls_parser.getint("playlist", "NumberOfEntries")
+    except (configparser.NoOptionError, ValueError) as err:
+        raise InvalidDataError("Invalid NumberOfEntries in playlist") from err
 
+    playlist_section = pls_parser["playlist"]
 
-async def fetch_playlist(mass: MusicAssistant, url: str) -> list[str]:
+    playlist = []
+    for entry in range(1, num_entries + 1):
+        file_option = f"File{entry}"
+        if file_option not in playlist_section:
+            continue
+        itempath = playlist_section[file_option]
+        playlist.append(
+            PlaylistItem(
+                length=playlist_section.get(f"Length{entry}"),
+                title=playlist_section.get(f"Title{entry}"),
+                path=itempath,
+            )
+        )
+    return playlist
+
+
+async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]:
     """Parse an online m3u or pls playlist."""
     try:
         async with mass.http_session.get(url, timeout=5) as resp:
@@ -61,14 +141,17 @@ async def fetch_playlist(mass: MusicAssistant, url: str) -> list[str]:
     except TimeoutError as err:
         msg = f"Timeout while fetching playlist {url}"
         raise InvalidDataError(msg) from err
-    except aiohttp.client_exceptions.ClientError as err:
+    except client_exceptions.ClientError as err:
         msg = f"Error while fetching playlist {url}"
         raise InvalidDataError(msg) from err
 
+    if "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data:
+        raise IsHLSPlaylist
+
     if url.endswith((".m3u", ".m3u8")):
-        playlist = await parse_m3u(playlist_data)
+        playlist = parse_m3u(playlist_data)
     else:
-        playlist = await parse_pls(playlist_data)
+        playlist = parse_pls(playlist_data)
 
     if not playlist:
         msg = f"Empty playlist {url}"
index c529bc1246fd16cc33fd05d17ae9d54398266303..fba1d4773d5f0e8d50e223c49ce0e467e89a2dec 100644 (file)
@@ -60,6 +60,7 @@ class AsyncProcess:
             self._custom_stdin = None
             self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin)))
         self._custom_stdout = custom_stdout
+        self._stderr_locked = asyncio.Lock()
 
     @property
     def closed(self) -> bool:
@@ -166,6 +167,7 @@ class AsyncProcess:
                 task.cancel()
                 with suppress(asyncio.CancelledError):
                     await task
+
         if self.proc.returncode is None:
             # always first try to send sigint signal to try clean shutdown
             # for example ffmpeg needs this to cleanly shutdown and not lock on pipes
@@ -179,8 +181,13 @@ class AsyncProcess:
             # especially with pipes this can cause deadlocks if not properly guarded
             # we need to use communicate to ensure buffers are flushed
             # we do that with sending communicate
+            if self._enable_stdin and not self.proc.stdin.is_closing():
+                self.proc.stdin.close()
             try:
-                await asyncio.wait_for(self.proc.communicate(), 2)
+                if self.proc.stdout and self._stderr_locked.locked():
+                    await asyncio.wait_for(self.proc.stdout.read(), 5)
+                else:
+                    await asyncio.wait_for(self.proc.communicate(), 5)
             except TimeoutError:
                 LOGGER.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
@@ -207,10 +214,11 @@ class AsyncProcess:
         stdout, stderr = await self.proc.communicate(input_data)
         return (stdout, stderr)
 
-    async def read_stderr(self) -> AsyncGenerator[bytes, None]:
-        """Read lines from the stderr stream."""
-        async for line in self.proc.stderr:
-            yield line
+    async def iter_stderr(self) -> AsyncGenerator[bytes, None]:
+        """Iterate lines from the stderr stream."""
+        async with self._stderr_locked:
+            async for line in self.proc.stderr:
+                yield line
 
     async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None:
         """Feed stdin with chunks from an AsyncGenerator."""
index 1990879311e6409dbc31664b76dd6afafc564b38..b1ced2d82a1e4de65370174e9ca7f8873aa87430 100644 (file)
@@ -344,7 +344,7 @@ class AirplayStream:
         lost_packets = 0
         prev_metadata_checksum: str = ""
         prev_progress_report: float = 0
-        async for line in self._cliraop_proc.read_stderr():
+        async for line in self._cliraop_proc.iter_stderr():
             line = line.decode().strip()  # noqa: PLW2901
             if not line:
                 continue
index 974c7cd73b5a4766c56a0d0adcc22d3bbf457bc4..e29735f0b992d0d757134a3dd99611b4501a30ac 100644 (file)
@@ -493,13 +493,13 @@ class FileSystemProviderBase(MusicProvider):
             playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
 
             if ext in ("m3u", "m3u8"):
-                playlist_lines = await parse_m3u(playlist_data)
+                playlist_lines = parse_m3u(playlist_data)
             else:
-                playlist_lines = await parse_pls(playlist_data)
+                playlist_lines = parse_pls(playlist_data)
 
-            for line_no, playlist_line in enumerate(playlist_lines, 1):
+            for line_no, playlist_line in enumerate(playlist_lines, 0):
                 if media_item := await self._parse_playlist_line(
-                    playlist_line, os.path.dirname(prov_playlist_id), line_no
+                    playlist_line.path, os.path.dirname(prov_playlist_id), line_no
                 ):
                     yield media_item
 
@@ -564,7 +564,6 @@ class FileSystemProviderBase(MusicProvider):
         if not await self.exists(prov_playlist_id):
             msg = f"Playlist path does not exist: {prov_playlist_id}"
             raise MediaNotFoundError(msg)
-        cur_lines = []
         _, ext = prov_playlist_id.rsplit(".", 1)
 
         # get playlist file contents
@@ -573,18 +572,20 @@ class FileSystemProviderBase(MusicProvider):
             playlist_data += chunk
         encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data)
         playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
-
+        # get current contents first
         if ext in ("m3u", "m3u8"):
-            playlist_lines = await parse_m3u(playlist_data)
+            playlist_items = parse_m3u(playlist_data)
         else:
-            playlist_lines = await parse_pls(playlist_data)
-
-        for line_no, playlist_line in enumerate(playlist_lines, 1):
-            if line_no not in positions_to_remove:
-                cur_lines.append(playlist_line)
-
-        new_playlist_data = "\n".join(cur_lines)
-        # write playlist file (always in utf-8)
+            playlist_items = parse_pls(playlist_data)
+        # remove items by index
+        for i in sorted(positions_to_remove, reverse=True):
+            del playlist_items[i]
+
+        # build new playlist data
+        new_playlist_data = "#EXTM3U\n"
+        for item in playlist_items:
+            new_playlist_data.append(f"#EXTINF:{item.length or 0},{item.title}\n")
+            new_playlist_data.append(f"{item.path}\n")
         await self.write_file_content(prov_playlist_id, new_playlist_data.encode("utf-8"))
 
     async def create_playlist(self, name: str) -> Playlist:
@@ -593,7 +594,7 @@ class FileSystemProviderBase(MusicProvider):
         # as creating a new (empty) file with the m3u extension...
         # filename = await self.resolve(f"{name}.m3u")
         filename = f"{name}.m3u"
-        await self.write_file_content(filename, b"")
+        await self.write_file_content(filename, b"#EXTM3U\n")
         return await self.get_playlist(filename)
 
     async def get_stream_details(self, item_id: str) -> StreamDetails:
index 1ffc8992dd8f9f7feefd95b49641363c4dbfed70..2f6f11fdbd8c591cce8a9178251ee251e166d689 100644 (file)
@@ -22,7 +22,6 @@ from music_assistant.common.models.media_items import (
     SearchResults,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_radio_stream, resolve_radio_stream
 from music_assistant.server.models.music_provider import MusicProvider
 
 SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
@@ -283,7 +282,6 @@ class RadioBrowserProvider(MusicProvider):
         """Get streamdetails for a radio station."""
         stream = await self.radios.station(uuid=item_id)
         await self.radios.station_click(uuid=item_id)
-        url_resolved, supports_icy = await resolve_radio_stream(self.mass, stream.url_resolved)
         return StreamDetails(
             provider=self.domain,
             item_id=item_id,
@@ -291,16 +289,6 @@ class RadioBrowserProvider(MusicProvider):
                 content_type=ContentType.try_parse(stream.codec),
             ),
             media_type=MediaType.RADIO,
-            data=url_resolved,
-            direct=url_resolved if not supports_icy else None,
-            expires=time() + 24 * 3600,
+            direct=stream.url_resolved,
+            expires=time() + 3600,
         )
-
-    async def get_audio_stream(
-        self,
-        streamdetails: StreamDetails,
-        seek_position: int = 0,
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
-            yield chunk
index 9bd8ff0e3bd7cb997e557248273d707b3c76eca2..b3e3242ea467cada48c42b19a2a6bf4f7ca30e25 100644 (file)
@@ -22,7 +22,6 @@ from music_assistant.common.models.media_items import (
 )
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_USERNAME
-from music_assistant.server.helpers.audio import get_radio_stream, resolve_radio_stream
 from music_assistant.server.helpers.tags import parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
 
@@ -227,15 +226,14 @@ class TuneInProvider(MusicProvider):
                     content_type=ContentType.UNKNOWN,
                 ),
                 media_type=MediaType.RADIO,
-                data=item_id,
+                direct=item_id,
+                expires=time() + 3600,
             )
         stream_item_id, media_type = item_id.split("--", 1)
         stream_info = await self.__get_data("Tune.ashx", id=stream_item_id)
         for stream in stream_info["body"]:
             if stream["media_type"] != media_type:
                 continue
-            # check if the radio stream is not a playlist
-            url_resolved, supports_icy = await resolve_radio_stream(self.mass, stream["url"])
             return StreamDetails(
                 provider=self.domain,
                 item_id=item_id,
@@ -243,22 +241,12 @@ class TuneInProvider(MusicProvider):
                     content_type=ContentType(stream["media_type"]),
                 ),
                 media_type=MediaType.RADIO,
-                data=url_resolved,
-                expires=time() + 24 * 3600,
-                direct=url_resolved if not supports_icy else None,
+                direct=stream["url"],
+                expires=time() + 3600,
             )
         msg = f"Unable to retrieve stream details for {item_id}"
         raise MediaNotFoundError(msg)
 
-    async def get_audio_stream(
-        self,
-        streamdetails: StreamDetails,
-        seek_position: int = 0,
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
-            yield chunk
-
     async def __get_data(self, endpoint: str, **kwargs):
         """Get data from api."""
         if endpoint.startswith("http"):
index e83886be10e5a0a3e741ad91fad2ad4caef4cf6d..c3ec5dd72ccba15bf389616406f6f38b995b4879 100644 (file)
@@ -6,6 +6,7 @@ import os
 from typing import TYPE_CHECKING
 
 from music_assistant.common.models.enums import ContentType, ImageType, MediaType
+from music_assistant.common.models.errors import MediaNotFoundError
 from music_assistant.common.models.media_items import (
     Artist,
     AudioFormat,
@@ -22,7 +23,6 @@ from music_assistant.server.helpers.audio import (
     get_radio_stream,
     resolve_radio_stream,
 )
-from music_assistant.server.helpers.playlists import fetch_playlist
 from music_assistant.server.helpers.tags import AudioTags, parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
 
@@ -75,11 +75,25 @@ class URLProvider(MusicProvider):
 
     async def get_track(self, prov_track_id: str) -> Track:
         """Get full track details by id."""
+        # always prefer db item for existing items to not overwrite user customizations
+        db_item = await self.mass.music.tracks.get_library_item_by_prov_id(
+            prov_track_id, self.instance_id
+        )
+        if db_item is None and not prov_track_id.startswith("http"):
+            msg = f"Track not found: {prov_track_id}"
+            raise MediaNotFoundError(msg)
         return await self.parse_item(prov_track_id)
 
     async def get_radio(self, prov_radio_id: str) -> Radio:
         """Get full radio details by id."""
-        return await self.parse_item(prov_radio_id, force_radio=True)
+        # always prefer db item for existing items to not overwrite user customizations
+        db_item = await self.mass.music.radio.get_library_item_by_prov_id(
+            prov_radio_id, self.instance_id
+        )
+        if db_item is None and not prov_radio_id.startswith("http"):
+            msg = f"Radio not found: {prov_radio_id}"
+            raise MediaNotFoundError(msg)
+        return await self.parse_item(prov_radio_id)
 
     async def get_artist(self, prov_artist_id: str) -> Track:
         """Get full artist details by id."""
@@ -113,16 +127,16 @@ class URLProvider(MusicProvider):
 
     async def parse_item(
         self,
-        item_id_or_url: str,
+        url: str,
         force_refresh: bool = False,
         force_radio: bool = False,
     ) -> Track | Radio:
         """Parse plain URL to MediaItem of type Radio or Track."""
-        item_id, url, media_info = await self._get_media_info(item_id_or_url, force_refresh)
+        media_info = await self._get_media_info(url, force_refresh)
         is_radio = media_info.get("icy-name") or not media_info.duration
         provider_mappings = {
             ProviderMapping(
-                item_id=item_id,
+                item_id=url,
                 provider_domain=self.domain,
                 provider_instance=self.instance_id,
                 audio_format=AudioFormat(
@@ -136,14 +150,14 @@ class URLProvider(MusicProvider):
         if is_radio or force_radio:
             # treat as radio
             media_item = Radio(
-                item_id=item_id,
+                item_id=url,
                 provider=self.domain,
                 name=media_info.get("icy-name") or media_info.title,
                 provider_mappings=provider_mappings,
             )
         else:
             media_item = Track(
-                item_id=item_id,
+                item_id=url,
                 provider=self.domain,
                 name=media_info.title,
                 duration=int(media_info.duration or 0),
@@ -157,38 +171,25 @@ class URLProvider(MusicProvider):
             ]
         return media_item
 
-    async def _get_media_info(
-        self, item_id_or_url: str, force_refresh: bool = False
-    ) -> tuple[str, str, AudioTags]:
-        """Retrieve (cached) mediainfo for url."""
-        # check if the radio stream is not a playlist
-        if item_id_or_url.endswith(("m3u8", "m3u", "pls")):
-            playlist = await fetch_playlist(self.mass, item_id_or_url)
-            url = playlist[0]
-            item_id = item_id_or_url
-            self._full_url[item_id] = url
-        else:
-            url = self._full_url.get(item_id_or_url, item_id_or_url)
-            item_id = item_id_or_url
-        cache_key = f"{self.instance_id}.media_info.{item_id}"
+    async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
+        """Retrieve mediainfo for url."""
         # do we have some cached info for this url ?
+        cache_key = f"{self.instance_id}.media_info.{url}"
         cached_info = await self.mass.cache.get(cache_key)
         if cached_info and not force_refresh:
-            media_info = AudioTags.parse(cached_info)
-        else:
-            # parse info with ffprobe (and store in cache)
-            media_info = await parse_tags(url)
-            if "authSig" in url:
-                media_info.has_cover_image = False
-            await self.mass.cache.set(cache_key, media_info.raw)
-        return (item_id, url, media_info)
+            return AudioTags.parse(cached_info)
+        # parse info with ffprobe (and store in cache)
+        resolved_url, _, _ = await resolve_radio_stream(self.mass, url)
+        media_info = await parse_tags(resolved_url)
+        if "authSig" in url:
+            media_info.has_cover_image = False
+        await self.mass.cache.set(cache_key, media_info.raw)
+        return media_info
 
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Get streamdetails for a track/radio."""
-        item_id, url, media_info = await self._get_media_info(item_id)
+        media_info = await self._get_media_info(item_id)
         is_radio = media_info.get("icy-name") or not media_info.duration
-        if is_radio:
-            url, supports_icy = await resolve_radio_stream(self.mass, url)
         return StreamDetails(
             provider=self.instance_id,
             item_id=item_id,
@@ -198,8 +199,8 @@ class URLProvider(MusicProvider):
                 bit_depth=media_info.bits_per_sample,
             ),
             media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
-            direct=None if is_radio and supports_icy else url,
-            data=url,
+            direct=item_id if is_radio else None,
+            data=item_id,
         )
 
     async def get_audio_stream(