Fix streaming issues with YouTube Music (#1188)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 28 Mar 2024 20:47:32 +0000 (21:47 +0100)
committerGitHub <noreply@github.com>
Thu, 28 Mar 2024 20:47:32 +0000 (21:47 +0100)
15 files changed:
music_assistant/common/models/enums.py
music_assistant/common/models/streamdetails.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/models/music_provider.py
music_assistant/server/providers/filesystem_local/base.py
music_assistant/server/providers/plex/__init__.py
music_assistant/server/providers/qobuz/__init__.py
music_assistant/server/providers/radiobrowser/__init__.py
music_assistant/server/providers/soundcloud/__init__.py
music_assistant/server/providers/tidal/__init__.py
music_assistant/server/providers/tunein/__init__.py
music_assistant/server/providers/url/__init__.py
music_assistant/server/providers/ytmusic/__init__.py

index 755e839d283ce56b3036bd455afbed8b1e6c6a73..2c6961259ccde785d12499c2cd764a9fe5d0c8b0 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import annotations
 
+import contextlib
 from enum import StrEnum
 from typing import Self
 
@@ -127,6 +128,7 @@ class ContentType(StrEnum):
     AIFF = "aiff"
     WMA = "wma"
     M4A = "m4a"
+    MP4 = "mp4"
     M4B = "m4b"
     DSF = "dsf"
     OPUS = "opus"
@@ -153,16 +155,15 @@ class ContentType(StrEnum):
         for splitter in (".", ","):
             if splitter in tempstr:
                 for val in tempstr.split(splitter):
-                    try:
-                        return cls(val.strip())
-                    except ValueError:
-                        pass
-
+                    with contextlib.suppress(ValueError):
+                        parsed = cls(val.strip())
+                    if parsed != ContentType.UNKNOWN:
+                        return parsed
         tempstr = tempstr.split("?")[0]
         tempstr = tempstr.split("&")[0]
         tempstr = tempstr.split(";")[0]
         tempstr = tempstr.replace("mp4", "m4a")
-        tempstr = tempstr.replace("mpd", "dash")
+        tempstr = tempstr.replace("mp4a", "m4a")
         try:
             return cls(tempstr)
         except ValueError:
index f958d5969a7a78a22ec4238047db1a6674380817..b87a5db7066fbd271641aec09caa9fb49b7ee8d5 100644 (file)
@@ -46,9 +46,8 @@ class StreamDetails(DataClassDictMixin):
     # expires: timestamp this streamdetails expire
     expires: float = time() + 3600
     # data: provider specific data (not exposed externally)
+    # this info is for example used to pass details to the get_audio_stream
     data: Any = None
-    # direct: if the url/file is supported by ffmpeg directly, use direct stream
-    direct: str | None = None
     # can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item
     can_seek: bool = True
 
index 720a6aeb5179b59abfa8046c3df9f3b32876c34b..b407e2f68c772f7cb0f5eed109914d3aff992c83 100644 (file)
@@ -1184,7 +1184,6 @@ class PlayerController(CoreController):
                     content_type=ContentType.try_parse(url),
                 ),
                 media_type=MediaType.ANNOUNCEMENT,
-                direct=url,
                 data={"url": url, "use_pre_announce": use_pre_announce},
                 target_loudness=-10,
             ),
index a2d565f708ed4801dcb3951cb09f6cf9e57b189c..01d17f45f4862eb527755cda42829a57fa887390 100644 (file)
@@ -48,7 +48,6 @@ from music_assistant.server.helpers.audio import (
     get_ffmpeg_args,
     get_ffmpeg_stream,
     get_player_filter_params,
-    get_radio_stream,
     parse_loudnorm,
     strip_silence,
 )
@@ -988,15 +987,6 @@ class StreamsController(CoreController):
 
         # collect all arguments for ffmpeg
         filter_params = []
-        extra_args = []
-        seek_pos = (
-            streamdetails.seek_position
-            if (streamdetails.direct or not streamdetails.can_seek)
-            else 0
-        )
-        if seek_pos:
-            # only use ffmpeg seeking if the provider stream does not support seeking
-            extra_args += ["-ss", str(seek_pos)]
         if streamdetails.target_loudness is not None:
             # add loudnorm filters
             filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=11:TP=-2"
@@ -1010,28 +1000,19 @@ class StreamsController(CoreController):
         if streamdetails.fade_in:
             filter_params.append("afade=type=in:start_time=0:duration=3")
 
-        if is_radio and streamdetails.direct and streamdetails.direct.startswith("http"):
-            # ensure we use the radio streamer for radio items
-            audio_source_iterator = get_radio_stream(self.mass, streamdetails.direct, streamdetails)
-            input_path = "-"
-        elif streamdetails.direct:
-            audio_source_iterator = None
-            input_path = streamdetails.direct
-        else:
-            audio_source_iterator = self.mass.get_provider(streamdetails.provider).get_audio_stream(
-                streamdetails,
-                seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
-            )
-            input_path = "-"
-
+        audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+            streamdetails,
+            seek_position=streamdetails.seek_position,
+        )
         ffmpeg_args = get_ffmpeg_args(
             input_format=streamdetails.audio_format,
             output_format=pcm_format,
             filter_params=filter_params,
-            extra_args=extra_args,
-            input_path=input_path,
+            input_path="-",
             # loglevel info is needed for loudness measurement
             loglevel="info",
+            # we criple ffmpeg a bit on purpose with the filter_threads
+            # option so it doesn't consume all cpu when calculating loudnorm
             extra_input_args=["-filter_threads", "1"],
         )
 
@@ -1040,7 +1021,10 @@ class StreamsController(CoreController):
             stderr_data = ""
             async for line in ffmpeg_proc.iter_stderr():
                 line = line.decode().strip()  # noqa: PLW2901
-                # if streamdetails contenttype is uinknown, try pars eit from the ffmpeg log output
+                if not line:
+                    continue
+                logger.log(VERBOSE_LOG_LEVEL, line)
+                # if streamdetails contenttype is unknown, try parse it from the ffmpeg log output
                 # this has no actual usecase, other than displaying the correct codec in the UI
                 if (
                     streamdetails.audio_format.content_type == ContentType.UNKNOWN
@@ -1053,8 +1037,6 @@ class StreamsController(CoreController):
                     stderr_data += line
                 elif "HTTP error" in line:
                     logger.warning(line)
-                elif line:
-                    logger.log(VERBOSE_LOG_LEVEL, line)
                 del line
 
             # if we reach this point, the process is finished (finish or aborted)
@@ -1102,9 +1084,9 @@ class StreamsController(CoreController):
 
         async with AsyncProcess(
             ffmpeg_args,
-            enable_stdin=audio_source_iterator is not None,
+            enable_stdin=True,
             enable_stderr=True,
-            custom_stdin=audio_source_iterator,
+            custom_stdin=audio_source,
             name="ffmpeg_media_stream",
         ) as ffmpeg_proc:
             state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
index a54491fa7388ab4198f1655982b79bd018a8d486..c18a33b0e652ce6e6f65c122ede035906f90c17e 100644 (file)
@@ -509,89 +509,6 @@ async def get_hls_stream(
                 logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
 
 
-async def get_hls_stream_org(
-    mass: MusicAssistant, url: str, streamdetails: StreamDetails
-) -> AsyncGenerator[bytes, None]:
-    """Get audio stream from HTTP HLS stream."""
-    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
-    # fetch master playlist and select (best) child playlist
-    # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
-    async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
-        charset = resp.charset or "utf-8"
-        master_m3u_data = await resp.text(charset)
-    substreams = parse_m3u(master_m3u_data)
-    if any(x for x in substreams if x.path.endswith(".ts")) or not all(
-        x for x in substreams if x.stream_info is not None
-    ):
-        # the url we got is already a substream
-        substream_url = url
-    else:
-        # sort substreams on best quality (highest bandwidth)
-        substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
-        substream = substreams[0]
-        substream_url = substream.path
-        if not substream_url.startswith("http"):
-            # path is relative, stitch it together
-            base_path = url.rsplit("/", 1)[0]
-            substream_url = base_path + "/" + substream.path
-
-    async def watch_metadata():
-        # ffmpeg is not (yet?) able to handle metadata updates that is provided
-        # in the substream playlist and/or the ID3 metadata
-        # so we do that here in a separate task.
-        # this also gets the basic
-        prev_chunk = ""
-        while True:
-            async with mass.http_session.get(
-                substream_url, headers=VLC_HEADERS, timeout=timeout
-            ) as resp:
-                charset = resp.charset or "utf-8"
-                substream_m3u_data = await resp.text(charset)
-            # get chunk-parts from the substream
-            hls_chunks = parse_m3u(substream_m3u_data)
-            metadata_found = False
-            for chunk_item in hls_chunks:
-                if chunk_item.path == prev_chunk:
-                    continue
-                chunk_item_url = chunk_item.path
-                if not chunk_item_url.startswith("http"):
-                    # path is relative, stitch it together
-                    base_path = substream_url.rsplit("/", 1)[0]
-                    chunk_item_url = base_path + "/" + chunk_item.path
-                if chunk_item.title and chunk_item.title != "no desc":
-                    streamdetails.stream_title = chunk_item.title
-                    metadata_found = True
-                # prevent that we play this chunk again if we loop through
-                prev_chunk = chunk_item.path
-                if chunk_item.length and chunk_item.length.isnumeric():
-                    await asyncio.sleep(int(chunk_item.length))
-                else:
-                    await asyncio.sleep(5)
-            if not metadata_found:
-                # this station does not provide metadata embedded in the HLS playlist
-                return
-
-    LOGGER.debug(
-        "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
-    )
-
-    if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
-        streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
-
-    try:
-        metadata_task = asyncio.create_task(watch_metadata())
-        async for chunk in get_ffmpeg_stream(
-            audio_input=substream_url,
-            input_format=streamdetails.audio_format,
-            # we need a self-explaining codec but not loose data from re-encoding
-            output_format=AudioFormat(content_type=ContentType.FLAC),
-        ):
-            yield chunk
-    finally:
-        if metadata_task and not metadata_task.done():
-            metadata_task.cancel()
-
-
 async def get_http_stream(
     mass: MusicAssistant,
     url: str,
@@ -603,40 +520,63 @@ async def get_http_stream(
     if seek_position:
         assert streamdetails.duration, "Duration required for seek requests"
     # try to get filesize with a head request
-    if seek_position and not streamdetails.size:
-        async with mass.http_session.head(url) as resp:
+    seek_supported = streamdetails.can_seek
+    if seek_position or not streamdetails.size:
+        async with mass.http_session.head(url, headers=VLC_HEADERS) as resp:
+            resp.raise_for_status()
             if size := resp.headers.get("Content-Length"):
                 streamdetails.size = int(size)
+            seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
     # headers
-    headers = {}
+    headers = {**VLC_HEADERS}
+    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
     skip_bytes = 0
     if seek_position and streamdetails.size:
         skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
-        headers["Range"] = f"bytes={skip_bytes}-"
+        headers["Range"] = f"bytes={skip_bytes}-{streamdetails.size}"
+
+    # seeking an unknown or container format is not supported due to the (moov) headers
+    if seek_position and (
+        not seek_supported
+        or streamdetails.audio_format.content_type
+        in (
+            ContentType.UNKNOWN,
+            ContentType.M4A,
+            ContentType.M4B,
+        )
+    ):
+        LOGGER.debug(
+            "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.",
+            streamdetails.uri,
+            streamdetails.audio_format.output_format_str,
+        )
+        async for chunk in get_ffmpeg_stream(
+            url,
+            # we must set the input content type to unknown to
+            # enforce ffmpeg to determine it from the headers
+            input_format=AudioFormat(content_type=ContentType.UNKNOWN),
+            # enforce wav as we dont want to re-encode lossy formats
+            # choose wav so we have descriptive headers and move on
+            output_format=AudioFormat(content_type=ContentType.WAV),
+            extra_input_args=["-ss", str(seek_position)],
+        ):
+            yield chunk
+        return
 
     # start the streaming from http
-    buffer = b""
-    buffer_all = False
     bytes_received = 0
-    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
-    async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
+    async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
         is_partial = resp.status == 206
-        buffer_all = seek_position and not is_partial
+        if seek_position and not is_partial:
+            raise InvalidDataError("HTTP source does not support seeking!")
+        resp.raise_for_status()
         async for chunk in resp.content.iter_any():
             bytes_received += len(chunk)
-            if buffer_all and not skip_bytes:
-                buffer += chunk
-                continue
-            if not is_partial and skip_bytes and bytes_received < skip_bytes:
-                continue
             yield chunk
 
     # store size on streamdetails for later use
     if not streamdetails.size:
         streamdetails.size = bytes_received
-    if buffer_all:
-        skip_bytes = streamdetails.size / streamdetails.duration * seek_position
-        yield buffer[:skip_bytes]
     LOGGER.debug(
         "Finished HTTP stream for %s (transferred %s/%s bytes)",
         streamdetails.uri,
@@ -678,6 +618,7 @@ async def get_ffmpeg_stream(
     extra_args: list[str] | None = None,
     chunk_size: int | None = None,
     loglevel: str | None = None,
+    extra_input_args: list[str] | None = None,
 ) -> AsyncGenerator[bytes, None]:
     """
     Get the ffmpeg audio stream as async generator.
@@ -696,6 +637,7 @@ async def get_ffmpeg_stream(
         input_path="-" if use_stdin else audio_input,
         output_path="-",
         loglevel=loglevel,
+        extra_input_args=extra_input_args or [],
     )
     async with AsyncProcess(
         ffmpeg_args,
@@ -733,50 +675,14 @@ async def get_preview_stream(
 ) -> AsyncGenerator[bytes, None]:
     """Create a 30 seconds preview audioclip for the given streamdetails."""
     music_prov = mass.get_provider(provider_instance_id_or_domain)
-
     streamdetails = await music_prov.get_stream_details(track_id)
-
-    input_args = [
-        "ffmpeg",
-        "-hide_banner",
-        "-loglevel",
-        "quiet",
-        "-ignore_unknown",
-    ]
-    if streamdetails.direct:
-        input_args += ["-ss", "30", "-i", streamdetails.direct]
-    else:
-        # the input is received from pipe/stdin
-        if streamdetails.audio_format.content_type != ContentType.UNKNOWN:
-            input_args += ["-f", streamdetails.audio_format.content_type]
-        input_args += ["-i", "-"]
-
-    output_args = ["-to", "30", "-f", "mp3", "-"]
-    args = input_args + output_args
-
-    writer_task: asyncio.Task | None = None
-    ffmpeg_proc = AsyncProcess(args, enable_stdin=True, enable_stdout=True, enable_stderr=False)
-    await ffmpeg_proc.start()
-
-    async def writer() -> None:
-        """Task that grabs the source audio and feeds it to ffmpeg."""
-        music_prov = mass.get_provider(streamdetails.provider)
-        async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
-            await ffmpeg_proc.write(audio_chunk)
-        # write eof when last packet is received
-        await ffmpeg_proc.write_eof()
-
-    if not streamdetails.direct:
-        writer_task = asyncio.create_task(writer())
-
-    # yield chunks from stdout
-    try:
-        async for chunk in ffmpeg_proc.iter_any():
-            yield chunk
-    finally:
-        if writer_task and not writer_task.done():
-            writer_task.cancel()
-        await ffmpeg_proc.close()
+    async for chunk in get_ffmpeg_stream(
+        audio_input=music_prov.get_audio_stream(streamdetails, 30),
+        input_format=streamdetails.audio_format,
+        output_format=AudioFormat(content_type=ContentType.MP3),
+        extra_input_args=["-to", "30"],
+    ):
+        yield chunk
 
 
 async def get_silence(
@@ -938,13 +844,20 @@ def get_ffmpeg_args(
             input_format.content_type.name.lower(),
             "-f",
             input_format.content_type.value,
+            "-i",
+            input_path,
         ]
-    input_args += ["-i", input_path]
+    elif input_format.content_type == ContentType.UNKNOWN:
+        # let ffmpeg guess/auto detect the content type
+        input_args += ["-i", input_path]
+    else:
+        # use explicit format identifier for all other
+        input_args += ["-f", input_format.content_type.value, "-i", input_path]
 
     # collect output args
     if output_path.upper() == "NULL":
         output_args = ["-f", "null", "-"]
-    else:
+    elif output_format.content_type.is_pcm():
         output_args = [
             "-acodec",
             output_format.content_type.name.lower(),
@@ -956,6 +869,12 @@ def get_ffmpeg_args(
             str(output_format.sample_rate),
             output_path,
         ]
+    elif output_format.content_type == ContentType.UNKNOWN:
+        # use wav so we at least have some headers for the rest of the chain
+        output_args = ["-f", "wav", output_path]
+    else:
+        # use explicit format identifier for all other
+        output_args = ["-f", output_format.content_type.value, output_path]
 
     # prefer libsoxr high quality resampler (if present) for sample rate conversions
     if input_format.sample_rate != output_format.sample_rate and libsoxr_support:
index d6e24d04f000d60ad4754fe583d99100012bdc30..5300405c37f3f5a721bc9b3aba58ff8a1485d818 100644 (file)
@@ -252,8 +252,7 @@ class MusicProvider(Provider):
         self, streamdetails: StreamDetails, seek_position: int = 0
     ) -> AsyncGenerator[bytes, None]:
         """Return the audio stream for the provider item."""
-        if streamdetails.direct is None:
-            raise NotImplementedError
+        raise NotImplementedError
 
     async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
         """Handle callback when an item completed streaming."""
index c8544c8163f3f8d4e750f1d0108741ff0db0874c..b926837ee2c773cc24b8e6481139a4fbcce43f45 100644 (file)
@@ -615,7 +615,7 @@ class FileSystemProviderBase(MusicProvider):
             media_type=MediaType.TRACK,
             duration=library_item.duration,
             size=file_item.file_size,
-            direct=file_item.local_path,
+            data=file_item.local_path,
             can_seek=prov_mapping.audio_format.content_type in SEEKABLE_FILES,
         )
 
index 888357671e6f36e8a61f540d620614e8585faa22..93367f059fa64e30e74700b15a0bd5cadefeff11 100644 (file)
@@ -9,7 +9,6 @@ from typing import TYPE_CHECKING, Any
 
 import plexapi.exceptions
 import requests
-from aiohttp import ClientTimeout
 from plexapi.audio import Album as PlexAlbum
 from plexapi.audio import Artist as PlexArtist
 from plexapi.audio import Playlist as PlexPlaylist
@@ -47,6 +46,7 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import get_http_stream
 from music_assistant.server.helpers.auth import AuthenticationHelper
 from music_assistant.server.helpers.tags import parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
@@ -760,7 +760,7 @@ class PlexProvider(MusicProvider):
         )
 
         if media_type != ContentType.M4A:
-            stream_details.direct = self._plex_server.url(media_part.key, True)
+            stream_details.data = self._plex_server.url(media_part.key, True)
             if audio_stream.samplingRate:
                 stream_details.audio_format.sample_rate = audio_stream.samplingRate
             if audio_stream.bitDepth:
@@ -781,12 +781,12 @@ class PlexProvider(MusicProvider):
         self, streamdetails: StreamDetails, seek_position: int = 0
     ) -> AsyncGenerator[bytes, None]:
         """Return the audio stream for the provider item."""
-        url = streamdetails.data.getStreamURL(offset=seek_position)
-
-        timeout = ClientTimeout(total=0, connect=30, sock_read=600)
-        async with self.mass.http_session.get(url, timeout=timeout) as resp:
-            async for chunk in resp.content.iter_any():
-                yield chunk
+        if isinstance(streamdetails.data, str):
+            url = streamdetails.data
+        else:
+            url = streamdetails.data.getStreamURL(offset=seek_position)
+        async for chunk in get_http_stream(self.mass, url, streamdetails, 0):
+            yield chunk
 
     async def get_myplex_account_and_refresh_token(self, auth_token: str) -> MyPlexAccount:
         """Get a MyPlexAccount object and refresh the token if needed."""
index f4f20291c826cc88d252a7fc7098e1992d0f7f5a..dbb07534a11716e2b679d449c815ec91350daea8 100644 (file)
@@ -43,6 +43,7 @@ from music_assistant.constants import (
 from music_assistant.server.helpers.app_vars import app_var
 
 # pylint: enable=no-name-in-module
+from music_assistant.server.helpers.audio import get_http_stream
 from music_assistant.server.models.music_provider import MusicProvider
 
 if TYPE_CHECKING:
@@ -400,8 +401,6 @@ class QobuzProvider(MusicProvider):
         else:
             msg = f"Unsupported mime type for {item_id}"
             raise MediaNotFoundError(msg)
-        # report playback started as soon as the streamdetails are requested
-        self.mass.create_task(self._report_playback_started(streamdata))
         return StreamDetails(
             item_id=str(item_id),
             provider=self.instance_id,
@@ -412,10 +411,20 @@ class QobuzProvider(MusicProvider):
             ),
             duration=streamdata["duration"],
             data=streamdata,  # we need these details for reporting playback
-            expires=time.time() + 60,  # may not be cached
-            direct=streamdata["url"],
+            expires=time.time() + 300,  # url expires very fast
         )
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        # report playback started as soon as we start streaming
+        self.mass.create_task(self._report_playback_started(streamdetails.data))
+        async for chunk in get_http_stream(
+            self.mass, streamdetails.data["url"], streamdetails, seek_position
+        ):
+            yield chunk
+
     async def _report_playback_started(self, streamdata: dict) -> None:
         """Report playback start to qobuz."""
         # TODO: need to figure out if the streamed track is purchased by user
index 2f6f11fdbd8c591cce8a9178251ee251e166d689..066f923a3225e73885bc00b458185e36e0b9628b 100644 (file)
@@ -22,6 +22,7 @@ from music_assistant.common.models.media_items import (
     SearchResults,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import get_radio_stream
 from music_assistant.server.models.music_provider import MusicProvider
 
 SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
@@ -289,6 +290,14 @@ class RadioBrowserProvider(MusicProvider):
                 content_type=ContentType.try_parse(stream.codec),
             ),
             media_type=MediaType.RADIO,
-            direct=stream.url_resolved,
+            data=stream.url_resolved,
             expires=time() + 3600,
         )
+
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        # report playback started as soon as we start streaming
+        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0):
+            yield chunk
index 91aac62b66bd1bf574e10441f337cb79ddd0e263..f1f8292af1d460a8e9a599ff8199bacd31f6bbe5 100644 (file)
@@ -24,6 +24,11 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import (
+    get_hls_stream,
+    get_http_stream,
+    resolve_radio_stream,
+)
 from music_assistant.server.models.music_provider import MusicProvider
 
 from .soundcloudpy.asyncsoundcloudpy import SoundcloudAsyncAPI
@@ -310,9 +315,23 @@ class SoundcloudMusicProvider(MusicProvider):
             audio_format=AudioFormat(
                 content_type=ContentType.try_parse(stream_format),
             ),
-            direct=url,
+            data=url,
         )
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        resolved_url, _, is_hls = await resolve_radio_stream(self.mass, streamdetails.data)
+        if is_hls:
+            # some soundcloud streams are HLS, prefer the radio streamer
+            async for chunk in get_hls_stream(self.mass, resolved_url, streamdetails):
+                yield chunk
+            return
+        # regular stream from http
+        async for chunk in get_http_stream(self.mass, resolved_url, streamdetails, seek_position):
+            yield chunk
+
     async def _parse_artist(self, artist_obj: dict) -> Artist:
         """Parse a Soundcloud user response to Artist model object."""
         artist_id = None
index 4ea764c8dfb3829fe673759dc357c868f3f9b171..57b7b10756e18823f36878e4cba4b54a82ca2429 100644 (file)
@@ -44,6 +44,7 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import get_http_stream
 from music_assistant.server.helpers.auth import AuthenticationHelper
 from music_assistant.server.helpers.tags import AudioTags, parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
@@ -439,9 +440,19 @@ class TidalProvider(MusicProvider):
                 channels=media_info.channels,
             ),
             duration=track.duration,
-            direct=url,
+            data=url,
         )
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        # report playback started as soon as we start streaming
+        async for chunk in get_http_stream(
+            self.mass, streamdetails.data, streamdetails, seek_position
+        ):
+            yield chunk
+
     async def get_artist(self, prov_artist_id: str) -> Artist:
         """Get artist details for given artist id."""
         tidal_session = await self._get_tidal_session()
index b3e3242ea467cada48c42b19a2a6bf4f7ca30e25..a7a8369ba7a9188a944ec322ae5c6855fac0836f 100644 (file)
@@ -22,6 +22,7 @@ from music_assistant.common.models.media_items import (
 )
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_USERNAME
+from music_assistant.server.helpers.audio import get_radio_stream
 from music_assistant.server.helpers.tags import parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
 
@@ -226,7 +227,7 @@ class TuneInProvider(MusicProvider):
                     content_type=ContentType.UNKNOWN,
                 ),
                 media_type=MediaType.RADIO,
-                direct=item_id,
+                data=item_id,
                 expires=time() + 3600,
             )
         stream_item_id, media_type = item_id.split("--", 1)
@@ -241,12 +242,20 @@ class TuneInProvider(MusicProvider):
                     content_type=ContentType(stream["media_type"]),
                 ),
                 media_type=MediaType.RADIO,
-                direct=stream["url"],
+                data=stream["url"],
                 expires=time() + 3600,
             )
         msg = f"Unable to retrieve stream details for {item_id}"
         raise MediaNotFoundError(msg)
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        # report playback started as soon as we start streaming
+        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0):
+            yield chunk
+
     async def __get_data(self, endpoint: str, **kwargs):
         """Get data from api."""
         if endpoint.startswith("http"):
index bffb8cbcaefab6af84db16c4d53b6a22f2973b17..9a4d950fddca4a5ddfcd1087ee35a74974769988 100644 (file)
@@ -199,8 +199,7 @@ class URLProvider(MusicProvider):
                 bit_depth=media_info.bits_per_sample,
             ),
             media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
-            direct=item_id if is_radio else None,
-            data=item_id,
+            data={"url": item_id},
         )
 
     async def get_audio_stream(
@@ -209,17 +208,19 @@ class URLProvider(MusicProvider):
         """Return the audio stream for the provider item."""
         if streamdetails.media_type == MediaType.RADIO:
             # radio stream url
-            async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
+            async for chunk in get_radio_stream(
+                self.mass, streamdetails.data["url"], streamdetails
+            ):
                 yield chunk
         elif os.path.isfile(streamdetails.data):
             # local file
             async for chunk in get_file_stream(
-                self.mass, streamdetails.data, streamdetails, seek_position
+                self.mass, streamdetails.data["url"], streamdetails, seek_position
             ):
                 yield chunk
         else:
             # regular stream url (without icy meta)
             async for chunk in get_http_stream(
-                self.mass, streamdetails.data, streamdetails, seek_position
+                self.mass, streamdetails.data["url"], streamdetails, seek_position
             ):
                 yield chunk
index e54b366e977cec26f67754712e8e8694de0454c6..76fc93d263474cdb19c7a6d468816738e1534919 100644 (file)
@@ -12,6 +12,7 @@ from typing import TYPE_CHECKING
 from urllib.parse import unquote
 
 import pytube
+from aiohttp import ClientResponseError
 
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
 from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
@@ -39,6 +40,7 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import get_http_stream
 from music_assistant.server.helpers.auth import AuthenticationHelper
 from music_assistant.server.models.music_provider import MusicProvider
 
@@ -533,7 +535,7 @@ class YoutubeMusicProvider(MusicProvider):
             audio_format=AudioFormat(
                 content_type=ContentType.try_parse(stream_format["mimeType"]),
             ),
-            direct=url,
+            data=url,
         )
         if (
             track_obj["streamingData"].get("expiresInSeconds")
@@ -550,6 +552,27 @@ class YoutubeMusicProvider(MusicProvider):
             stream_details.audio_format.sample_rate = int(stream_format.get("audioSampleRate"))
         return stream_details
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        is_retry = False
+        while True:
+            try:
+                async for chunk in get_http_stream(
+                    self.mass, streamdetails.data, streamdetails, seek_position
+                ):
+                    yield chunk
+                return
+            except ClientResponseError as err:
+                if not is_retry and err.status == 403:
+                    # cipher expired, get a fresh one
+                    self.logger.warning("Cipher expired, trying to refresh...")
+                    streamdetails = await self.get_stream_details(streamdetails.item_id)
+                    continue
+                # raise for all other cases or we have already retried
+                raise
+
     async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs):
         """Post data to the given endpoint."""
         await self._check_oauth_token()