Fix issues with streaming YTM and some radio stations (#1193)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 1 Apr 2024 15:21:08 +0000 (17:21 +0200)
committerGitHub <noreply@github.com>
Mon, 1 Apr 2024 15:21:08 +0000 (17:21 +0200)
24 files changed:
music_assistant/common/models/enums.py
music_assistant/common/models/streamdetails.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/process.py
music_assistant/server/models/music_provider.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/deezer/__init__.py
music_assistant/server/providers/filesystem_local/__init__.py
music_assistant/server/providers/filesystem_local/base.py
music_assistant/server/providers/jellyfin/__init__.py
music_assistant/server/providers/opensubsonic/sonic_provider.py
music_assistant/server/providers/plex/__init__.py
music_assistant/server/providers/qobuz/__init__.py
music_assistant/server/providers/radiobrowser/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/sonos/player.py
music_assistant/server/providers/soundcloud/__init__.py
music_assistant/server/providers/spotify/__init__.py
music_assistant/server/providers/tidal/__init__.py
music_assistant/server/providers/tunein/__init__.py
music_assistant/server/providers/url/__init__.py
music_assistant/server/providers/ytmusic/__init__.py

index 2c6961259ccde785d12499c2cd764a9fe5d0c8b0..75a2d53a45c1c8080c7c6d4d388255d636956627 100644 (file)
@@ -397,3 +397,11 @@ class ConfigEntryType(StrEnum):
     def _missing_(cls: Self, value: object) -> Self:  # noqa: ARG003
         """Set default enum member if an unknown value is provided."""
         return cls.UNKNOWN
+
+
+class StreamType(StrEnum):
+    """Enum for the type of streamdetails."""
+
+    HTTP = "http"
+    LOCAL_FILE = "local_file"
+    CUSTOM = "custom"
index b87a5db7066fbd271641aec09caa9fb49b7ee8d5..230c5acfece042e2c8ce3ef81854b1e560c643ec 100644 (file)
@@ -3,12 +3,11 @@
 from __future__ import annotations
 
 from dataclasses import dataclass
-from time import time
 from typing import Any
 
 from mashumaro import DataClassDictMixin
 
-from music_assistant.common.models.enums import MediaType
+from music_assistant.common.models.enums import MediaType, StreamType
 from music_assistant.common.models.media_items import AudioFormat
 
 
@@ -36,6 +35,8 @@ class StreamDetails(DataClassDictMixin):
     item_id: str
     audio_format: AudioFormat
     media_type: MediaType = MediaType.TRACK
+    stream_type: StreamType = StreamType.CUSTOM
+    path: str | None = None
 
     # stream_title: radio streams can optionally set this field
     stream_title: str | None = None
@@ -43,14 +44,14 @@ class StreamDetails(DataClassDictMixin):
     duration: int | None = None
     # total size in bytes of the item, calculated at eof when omitted
     size: int | None = None
-    # expires: timestamp this streamdetails expire
-    expires: float = time() + 3600
     # data: provider specific data (not exposed externally)
     # this info is for example used to pass details to the get_audio_stream
     data: Any = None
     # can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item
     can_seek: bool = True
 
+    # stream_type:
+
     # the fields below will be set/controlled by the streamcontroller
     seek_position: int = 0
     fade_in: bool = False
index b407e2f68c772f7cb0f5eed109914d3aff992c83..52a09eea8c3292f1c21d312c71e81e81a1812bb8 100644 (file)
@@ -28,6 +28,7 @@ from music_assistant.common.models.enums import (
     PlayerType,
     ProviderFeature,
     ProviderType,
+    StreamType,
 )
 from music_assistant.common.models.errors import (
     AlreadyRegisteredError,
@@ -1183,8 +1184,10 @@ class PlayerController(CoreController):
                 audio_format=AudioFormat(
                     content_type=ContentType.try_parse(url),
                 ),
+                stream_type=StreamType.HTTP,
                 media_type=MediaType.ANNOUNCEMENT,
                 data={"url": url, "use_pre_announce": use_pre_announce},
+                path=url,
                 target_loudness=-10,
             ),
         )
index 826d5fd1ed8e2de70d8688be4c267f677bbe81e3..1516250ac684d6fcf840919789c07bdb499955d4 100644 (file)
@@ -15,7 +15,7 @@ import time
 import urllib.parse
 from collections.abc import AsyncGenerator
 from contextlib import suppress
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
 
 import shortuuid
 from aiofiles.os import wrap
@@ -27,8 +27,8 @@ from music_assistant.common.models.config_entries import (
     ConfigValueOption,
     ConfigValueType,
 )
-from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType
-from music_assistant.common.models.errors import AudioError, QueueEmpty
+from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType, StreamType
+from music_assistant.common.models.errors import QueueEmpty
 from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import (
@@ -41,19 +41,20 @@ from music_assistant.constants import (
     CONF_PUBLISH_IP,
     SILENCE_FILE,
     UGP_PREFIX,
-    VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
+    FFMpeg,
     check_audio_support,
     crossfade_pcm_parts,
-    get_ffmpeg_args,
     get_ffmpeg_stream,
+    get_hls_stream,
+    get_icy_stream,
     get_player_filter_params,
     parse_loudnorm,
+    resolve_radio_stream,
     strip_silence,
 )
-from music_assistant.server.helpers.process import AsyncProcess
 from music_assistant.server.helpers.util import get_ips
 from music_assistant.server.helpers.webserver import Webserver
 from music_assistant.server.models.core_controller import CoreController
@@ -985,8 +986,8 @@ class StreamsController(CoreController):
             strip_silence_end = False
         # pcm_sample_size = chunk size = 1 second of pcm audio
         pcm_sample_size = pcm_format.pcm_sample_size
-        buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size
-        buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size
+        buffer_size_begin = pcm_sample_size * 3 if strip_silence_begin else pcm_sample_size * 2
+        buffer_size_end = pcm_sample_size * 6 if strip_silence_end else pcm_sample_size * 2
 
         # collect all arguments for ffmpeg
         filter_params = []
@@ -1003,95 +1004,110 @@ class StreamsController(CoreController):
         if streamdetails.fade_in:
             filter_params.append("afade=type=in:start_time=0:duration=3")
 
-        ffmpeg_args = get_ffmpeg_args(
+        if streamdetails.stream_type == StreamType.CUSTOM:
+            audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+                streamdetails,
+                seek_position=streamdetails.seek_position,
+            )
+        elif streamdetails.media_type == MediaType.RADIO:
+            resolved_url, supports_icy, is_hls = await resolve_radio_stream(
+                self.mass, streamdetails.path
+            )
+            if supports_icy:
+                audio_source = get_icy_stream(self.mass, resolved_url, streamdetails)
+            elif is_hls:
+                audio_source = get_hls_stream(self.mass, resolved_url, streamdetails)
+            else:
+                audio_source = resolved_url
+        else:
+            audio_source = streamdetails.path
+        extra_input_args = []
+        if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM:
+            extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
+        logger.debug("start media stream for: %s", streamdetails.uri)
+        state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
+
+        async with FFMpeg(
+            audio_input=audio_source,
             input_format=streamdetails.audio_format,
             output_format=pcm_format,
             filter_params=filter_params,
-            input_path="-",
-            # loglevel info is needed for loudness measurement
-            loglevel="info",
             # we criple ffmpeg a bit on purpose with the filter_threads
             # option so it doesn't consume all cpu when calculating loudnorm
-            extra_input_args=["-filter_threads", "1"],
-        )
+            extra_input_args=[*extra_input_args, "-filter_threads", "1"],
+            name="ffmpeg_media_stream",
+            stderr_enabled=True,
+        ) as ffmpeg_proc:
 
-        async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]):
-            # To prevent stderr locking up, we must keep reading it
-            stderr_data = ""
-            async for line in ffmpeg_proc.iter_stderr():
-                line = line.decode().strip()  # noqa: PLW2901
-                if not line:
-                    continue
-                logger.log(VERBOSE_LOG_LEVEL, line)
-                # if streamdetails contenttype is unknown, try parse it from the ffmpeg log output
-                # this has no actual usecase, other than displaying the correct codec in the UI
-                if (
-                    streamdetails.audio_format.content_type == ContentType.UNKNOWN
-                    and line.startswith("Stream #0:0: Audio: ")
-                ):
-                    streamdetails.audio_format.content_type = ContentType.try_parse(
-                        line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
-                    )
-                if stderr_data or "loudnorm" in line:
-                    stderr_data += line
-                elif "HTTP error" in line:
-                    logger.warning(line)
-                del line
-
-            # if we reach this point, the process is finished (finish or aborted)
-            if ffmpeg_proc.returncode == 0:
-                await state_data["finished"].wait()
-            finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set()
-            bytes_sent = state_data["bytes_sent"]
-            seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
-            streamdetails.seconds_streamed = seconds_streamed
-            state_str = "finished" if finished else "aborted"
-            logger.debug(
-                "stream %s for: %s (%s seconds streamed, exitcode %s)",
-                state_str,
-                streamdetails.uri,
-                seconds_streamed,
-                ffmpeg_proc.returncode,
-            )
-            # store accurate duration
-            if finished:
-                streamdetails.duration = streamdetails.seek_position + seconds_streamed
-
-            # parse loudnorm data if we have that collected
-            if stderr_data and (loudness_details := parse_loudnorm(stderr_data)):
-                required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
-                if finished or (seconds_streamed >= required_seconds):
-                    logger.debug(
-                        "Loudness measurement for %s: %s", streamdetails.uri, loudness_details
-                    )
-                    streamdetails.loudness = loudness_details
-                    await self.mass.music.set_track_loudness(
-                        streamdetails.item_id, streamdetails.provider, loudness_details
-                    )
+            async def log_reader():
+                # To prevent stderr locking up, we must keep reading it
+                stderr_data = ""
+                async for line in ffmpeg_proc.iter_stderr():
+                    if "error" in line or "warning" in line:
+                        logger.warning(line)
+                    elif "critical" in line:
+                        logger.critical(line)
+                    elif (
+                        streamdetails.audio_format.content_type == ContentType.UNKNOWN
+                        and line.startswith("Stream #0:0: Audio: ")
+                    ):
+                        # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
+                        streamdetails.audio_format.content_type = ContentType.try_parse(
+                            line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
+                        )
+                    elif stderr_data or "loudnorm" in line:
+                        stderr_data += line
+                    else:
+                        logger.debug(line)
+                    del line
+
+                # if we reach this point, the process is finished (completed or aborted)
+                if ffmpeg_proc.returncode == 0:
+                    await state_data["finished"].wait()
+                finished = state_data["finished"].is_set()
+                bytes_sent = state_data["bytes_sent"]
+                seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+                streamdetails.seconds_streamed = seconds_streamed
+                state_str = "finished" if finished else "aborted"
+                logger.debug(
+                    "stream %s for: %s (%s seconds streamed, exitcode %s)",
+                    state_str,
+                    streamdetails.uri,
+                    seconds_streamed,
+                    ffmpeg_proc.returncode,
+                )
+                # store accurate duration
+                if finished and not streamdetails.seek_position:
+                    streamdetails.duration = seconds_streamed
+
+                # parse loudnorm data if we have that collected
+                if stderr_data and (loudness_details := parse_loudnorm(stderr_data)):
+                    required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+                    if finished or (seconds_streamed >= required_seconds):
+                        logger.debug(
+                            "Loudness measurement for %s: %s", streamdetails.uri, loudness_details
+                        )
+                        streamdetails.loudness = loudness_details
+                        await self.mass.music.set_track_loudness(
+                            streamdetails.item_id, streamdetails.provider, loudness_details
+                        )
 
-            # report playback
-            if finished or seconds_streamed > 30:
-                self.mass.create_task(
-                    self.mass.music.mark_item_played(
-                        streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+                # report playback
+                # TODO: Move this to the queue controller ?
+                if finished or seconds_streamed > 30:
+                    self.mass.create_task(
+                        self.mass.music.mark_item_played(
+                            streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+                        )
                     )
-                )
-                if music_prov := self.mass.get_provider(streamdetails.provider):
-                    self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
-            # cleanup
-            del stderr_data
-
-        audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
-            streamdetails,
-            seek_position=streamdetails.seek_position,
-        )
-        async with AsyncProcess(
-            ffmpeg_args, stdin=audio_source, stdout=True, stderr=True, name="ffmpeg_media_stream"
-        ) as ffmpeg_proc:
-            state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
-            logger.debug("start media stream for: %s", streamdetails.uri)
+                    if music_prov := self.mass.get_provider(streamdetails.provider):
+                        self.mass.create_task(
+                            music_prov.on_streamed(streamdetails, seconds_streamed)
+                        )
+                # cleanup
+                del stderr_data
 
-            self.mass.create_task(log_reader(ffmpeg_proc, state_data))
+            self.mass.create_task(log_reader())
 
             # get pcm chunks from stdout
             # we always stay buffer_size of bytes behind
@@ -1108,8 +1124,8 @@ class StreamsController(CoreController):
                     # buffer is not full enough, move on
                     continue
 
-                if strip_silence_begin and chunk_num == 2:
-                    # first 2 chunks received, strip silence of beginning
+                if strip_silence_begin and chunk_num == 3:
+                    # first 3 chunks received, strip silence of beginning
                     stripped_audio = await strip_silence(
                         self.mass,
                         buffer,
@@ -1130,11 +1146,6 @@ class StreamsController(CoreController):
                     yield subchunk
                     del subchunk
 
-            # if we did not receive any data, something went (terribly) wrong
-            # raise here to prevent an (endless) loop elsewhere
-            if state_data["bytes_sent"] == 0:
-                raise AudioError(f"stream error on {streamdetails.uri}")
-
             # all chunks received, strip silence of last part if needed and yield remaining bytes
             if strip_silence_end:
                 final_chunk = await strip_silence(
index 6c1ef7d68e4fbc2987f3f0ec070540f1837df856..c07e609534f36f58096afc2dd1c4f883f0c8b634 100644 (file)
@@ -9,11 +9,10 @@ import re
 import struct
 from collections import deque
 from io import BytesIO
-from time import time
 from typing import TYPE_CHECKING
 
 import aiofiles
-from aiohttp import ClientResponseError, ClientTimeout
+from aiohttp import ClientTimeout
 
 from music_assistant.common.helpers.global_cache import (
     get_global_cache_value,
@@ -59,8 +58,45 @@ LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
 # pylint:disable=consider-using-f-string,too-many-locals,too-many-statements
 # ruff: noqa: PLR0915
 
-VLC_HEADERS = {"User-Agent": "VLC/3.0.2.LibVLC/3.0.2"}
-VLC_HEADERS_ICY = {**VLC_HEADERS, "Icy-MetaData": "1"}
+HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
+HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
+
+
+class FFMpeg(AsyncProcess):
+    """FFMpeg wrapped as AsyncProcess."""
+
+    def __init__(
+        self,
+        audio_input: AsyncGenerator[bytes, None] | str | int,
+        input_format: AudioFormat,
+        output_format: AudioFormat,
+        filter_params: list[str] | None = None,
+        extra_args: list[str] | None = None,
+        extra_input_args: list[str] | None = None,
+        name: str = "ffmpeg",
+        stderr_enabled: bool = False,
+        audio_output: str | int = "-",
+    ) -> None:
+        """Initialize AsyncProcess."""
+        ffmpeg_args = get_ffmpeg_args(
+            input_format=input_format,
+            output_format=output_format,
+            filter_params=filter_params or [],
+            extra_args=extra_args or [],
+            input_path=audio_input if isinstance(audio_input, str) else "-",
+            output_path=audio_output if isinstance(audio_output, str) else "-",
+            extra_input_args=extra_input_args or [],
+            loglevel="info"
+            if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
+            else "error",
+        )
+        super().__init__(
+            ffmpeg_args,
+            stdin=True if isinstance(audio_input, str) else audio_input,
+            stdout=True if isinstance(audio_output, str) else audio_output,
+            stderr=stderr_enabled,
+            name=name,
+        )
 
 
 async def crossfade_pcm_parts(
@@ -201,10 +237,10 @@ async def get_stream_details(
     Do not try to request streamdetails in advance as this is expiring data.
         param media_item: The QueueItem for which to request the streamdetails for.
     """
-    if queue_item.streamdetails and (time() + 60) < queue_item.streamdetails.expires:
+    if queue_item.streamdetails and seek_position:
         LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}")
-        # we already have (fresh) streamdetails stored on the queueitem, use these.
-        # this happens for example while seeking in a track.
+        # we already have (fresh?) streamdetails stored on the queueitem, use these.
+        # only do this when we're seeking.
         # we create a copy (using to/from dict) to ensure the one-time values are cleared
         streamdetails = StreamDetails.from_dict(queue_item.streamdetails.to_dict())
     else:
@@ -222,13 +258,6 @@ async def get_stream_details(
             if not music_prov:
                 LOGGER.debug(f"Skipping {prov_media} - provider not available")
                 continue  # provider not available ?
-            # prefer cache
-            item_key = f"{music_prov.lookup_key}/{prov_media.item_id}"
-            cache_key = f"cached_streamdetails_{item_key}"
-            if cache := await mass.cache.get(cache_key):
-                if time() + 60 < cache["expires"]:
-                    LOGGER.debug(f"Using cached streamdetails for {item_key}")
-                    streamdetails = StreamDetails.from_dict(cache)
             # get streamdetails from provider
             try:
                 streamdetails: StreamDetails = await music_prov.get_stream_details(
@@ -237,9 +266,6 @@ async def get_stream_details(
             except MusicAssistantError as err:
                 LOGGER.warning(str(err))
             else:
-                # store streamdetails in cache
-                expiration = streamdetails.expires - time()
-                await mass.cache.set(cache_key, streamdetails.to_dict(), expiration=expiration - 60)
                 break
         else:
             raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
@@ -322,9 +348,7 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=
     return file.getvalue()
 
 
-async def resolve_radio_stream(
-    mass: MusicAssistant, url: str, use_get: bool = False
-) -> tuple[str, bool, bool]:
+async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]:
     """
     Resolve a streaming radio URL.
 
@@ -345,16 +369,15 @@ async def resolve_radio_stream(
     resolved_url = url
     timeout = ClientTimeout(total=0, connect=10, sock_read=5)
     try:
-        method = "GET" if use_get else "HEAD"
-        async with mass.http_session.request(
-            method, url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout
+        async with mass.http_session.get(
+            url, headers=HTTP_HEADERS_ICY, allow_redirects=True, timeout=timeout
         ) as resp:
             resolved_url = str(resp.real_url)
             headers = resp.headers
             resp.raise_for_status()
             if not resp.headers:
                 raise InvalidDataError("no headers found")
-        supports_icy = headers.get("icy-name") is not None or "Icecast" in headers.get("server", "")
+        supports_icy = headers.get("icy-metaint") is not None
         is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
         if (
             base_url.endswith((".m3u", ".m3u8", ".pls"))
@@ -371,9 +394,7 @@ async def resolve_radio_stream(
             except IsHLSPlaylist:
                 is_hls = True
 
-    except (ClientResponseError, InvalidDataError) as err:
-        if not use_get:
-            return await resolve_radio_stream(mass, resolved_url, True)
+    except Exception as err:
         LOGGER.warning("Error while parsing radio URL %s: %s", url, err)
         return (resolved_url, supports_icy, is_hls)
 
@@ -383,33 +404,13 @@ async def resolve_radio_stream(
     return result
 
 
-async def get_radio_stream(
-    mass: MusicAssistant, url: str, streamdetails: StreamDetails
-) -> AsyncGenerator[bytes, None]:
-    """Get radio audio stream from HTTP, including metadata retrieval."""
-    resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url)
-    # handle special HLS stream
-    if is_hls:
-        async for chunk in get_hls_stream(mass, resolved_url, streamdetails):
-            yield chunk
-        return
-    # handle http stream supports icy metadata
-    if supports_icy:
-        async for chunk in get_icy_stream(mass, resolved_url, streamdetails):
-            yield chunk
-        return
-    # generic http stream (without icy metadata)
-    async for chunk in get_http_stream(mass, resolved_url, streamdetails):
-        yield chunk
-
-
 async def get_icy_stream(
     mass: MusicAssistant, url: str, streamdetails: StreamDetails
 ) -> AsyncGenerator[bytes, None]:
     """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
     timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
     LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
-    async with mass.http_session.get(url, headers=VLC_HEADERS_ICY, timeout=timeout) as resp:
+    async with mass.http_session.get(url, headers=HTTP_HEADERS_ICY, timeout=timeout) as resp:
         headers = resp.headers
         meta_int = int(headers["icy-metaint"])
         while True:
@@ -440,7 +441,7 @@ async def get_hls_stream(
     timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
     # fetch master playlist and select (best) child playlist
     # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
-    async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
+    async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp:
         charset = resp.charset or "utf-8"
         master_m3u_data = await resp.text(charset)
     substreams = parse_m3u(master_m3u_data)
@@ -471,7 +472,7 @@ async def get_hls_stream(
     has_id3_metadata: bool | None = None
     while True:
         async with mass.http_session.get(
-            substream_url, headers=VLC_HEADERS, timeout=timeout
+            substream_url, headers=HTTP_HEADERS, timeout=timeout
         ) as resp:
             charset = resp.charset or "utf-8"
             substream_m3u_data = await resp.text(charset)
@@ -487,7 +488,7 @@ async def get_hls_stream(
                 chunk_item_url = base_path + "/" + chunk_item.path
             # handle (optional) in-playlist (timed) metadata
             if has_playlist_metadata is None:
-                has_playlist_metadata = chunk_item.title is not None
+                has_playlist_metadata = chunk_item.title not in (None, "")
                 logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
             if has_playlist_metadata and chunk_item.title != "no desc":
                 # bbc (and maybe others?) set the title to 'no desc'
@@ -496,7 +497,7 @@ async def get_hls_stream(
             # prevent that we play this chunk again if we loop through
             prev_chunks.append(chunk_item.path)
             async with mass.http_session.get(
-                chunk_item_url, headers=VLC_HEADERS, timeout=timeout
+                chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
             ) as resp:
                 async for chunk in resp.content.iter_any():
                     yield chunk
@@ -522,13 +523,13 @@ async def get_http_stream(
     # try to get filesize with a head request
     seek_supported = streamdetails.can_seek
     if seek_position or not streamdetails.size:
-        async with mass.http_session.head(url, headers=VLC_HEADERS) as resp:
+        async with mass.http_session.head(url, headers=HTTP_HEADERS) as resp:
             resp.raise_for_status()
             if size := resp.headers.get("Content-Length"):
                 streamdetails.size = int(size)
             seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
     # headers
-    headers = {**VLC_HEADERS}
+    headers = {**HTTP_HEADERS}
     timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
     skip_bytes = 0
     if seek_position and streamdetails.size:
@@ -545,23 +546,13 @@ async def get_http_stream(
             ContentType.M4B,
         )
     ):
-        LOGGER.debug(
-            "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.",
+        LOGGER.warning(
+            "Seeking in %s (%s) not possible.",
             streamdetails.uri,
             streamdetails.audio_format.output_format_str,
         )
-        async for chunk in get_ffmpeg_stream(
-            url,
-            # we must set the input content type to unknown to
-            # enforce ffmpeg to determine it from the headers
-            input_format=AudioFormat(content_type=ContentType.UNKNOWN),
-            # enforce wav as we dont want to re-encode lossy formats
-            # choose wav so we have descriptive headers and move on
-            output_format=AudioFormat(content_type=ContentType.WAV),
-            extra_input_args=["-ss", str(seek_position)],
-        ):
-            yield chunk
-        return
+        seek_position = 0
+        streamdetails.seek_position = 0
 
     # start the streaming from http
     bytes_received = 0
@@ -608,23 +599,13 @@ async def get_file_stream(
             ContentType.MP4,
         )
     ):
-        LOGGER.debug(
-            "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.",
+        LOGGER.warning(
+            "Seeking in %s (%s) not possible.",
             streamdetails.uri,
             streamdetails.audio_format.output_format_str,
         )
-        async for chunk in get_ffmpeg_stream(
-            filename,
-            # we must set the input content type to unknown to
-            # enforce ffmpeg to determine it from the headers
-            input_format=AudioFormat(content_type=ContentType.UNKNOWN),
-            # enforce wav as we dont want to re-encode lossy formats
-            # choose wav so we have descriptive headers and move on
-            output_format=AudioFormat(content_type=ContentType.WAV),
-            extra_input_args=["-ss", str(seek_position)],
-        ):
-            yield chunk
-        return
+        seek_position = 0
+        streamdetails.seek_position = 0
 
     chunk_size = get_chunksize(streamdetails.audio_format)
     async with aiofiles.open(streamdetails.data, "rb") as _file:
@@ -646,9 +627,8 @@ async def get_ffmpeg_stream(
     filter_params: list[str] | None = None,
     extra_args: list[str] | None = None,
     chunk_size: int | None = None,
-    ffmpeg_loglevel: str = "info",
     extra_input_args: list[str] | None = None,
-    logger: logging.Logger | None = None,
+    name: str = "ffmpeg",
 ) -> AsyncGenerator[bytes, None]:
     """
     Get the ffmpeg audio stream as async generator.
@@ -656,27 +636,18 @@ async def get_ffmpeg_stream(
     Takes care of resampling and/or recoding if needed,
     according to player preferences.
     """
-    ffmpeg_args = get_ffmpeg_args(
+    async with FFMpeg(
+        audio_input=audio_input,
         input_format=input_format,
         output_format=output_format,
-        filter_params=filter_params or [],
-        extra_args=extra_args or [],
-        input_path=audio_input if isinstance(audio_input, str) else "-",
-        output_path="-",
-        loglevel=ffmpeg_loglevel,
-        extra_input_args=extra_input_args or [],
-    )
-    stdin = audio_input if not isinstance(audio_input, str) else True
-    async with AsyncProcess(
-        ffmpeg_args,
-        stdin=stdin,
-        stdout=True,
-        stderr=logger or LOGGER.getChild("ffmpeg_stream"),
-        name="ffmpeg_stream",
+        filter_params=filter_params,
+        extra_args=extra_args,
+        extra_input_args=extra_input_args,
+        name=name,
     ) as ffmpeg_proc:
         # read final chunks from stdout
-        chunk_size = chunk_size or get_chunksize(output_format, 1)
-        async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+        iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
+        async for chunk in iterator:
             yield chunk
 
 
@@ -809,8 +780,8 @@ def get_ffmpeg_args(
     extra_args: list[str] | None = None,
     input_path: str = "-",
     output_path: str = "-",
-    loglevel: str = "info",
     extra_input_args: list[str] | None = None,
+    loglevel: str = "error",
 ) -> list[str]:
     """Collect all args to send to the ffmpeg process."""
     if extra_args is None:
@@ -833,6 +804,7 @@ def get_ffmpeg_args(
         "-hide_banner",
         "-loglevel",
         loglevel,
+        "-nostats",
         "-ignore_unknown",
         "-protocol_whitelist",
         "file,http,https,tcp,tls,crypto,pipe,data,fd",
index 532de822a647cda0f10de3f63af4284b402e0d29..9dc88511acc52558355df5e34adae5a2aef607f5 100644 (file)
@@ -19,7 +19,7 @@ from signal import SIGINT
 from types import TracebackType
 from typing import TYPE_CHECKING
 
-from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
+from music_assistant.constants import MASS_LOGGER_NAME
 
 LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
 
@@ -42,16 +42,16 @@ class AsyncProcess:
         args: list[str],
         stdin: bool | int | AsyncGenerator[bytes, None] | None = None,
         stdout: bool | int | None = None,
-        stderr: bool | int | logging.Logger | None = None,
+        stderr: bool | int | None = False,
         name: str | None = None,
     ) -> None:
         """Initialize AsyncProcess."""
         self.proc: asyncio.subprocess.Process | None = None
-        if isinstance(stderr, logging.Logger):
-            self._stderr_logger = stderr
-            stderr = asyncio.subprocess.PIPE
-        else:
-            self._stderr_logger = None
+        if name is None:
+            name = args[0].split(os.sep)[-1]
+        self.name = name
+        self.attached_tasks: list[asyncio.Task] = []
+        self.logger = LOGGER.getChild(name)
         self._args = args
         self._stdin = stdin
         self._stdout = stdout
@@ -61,10 +61,6 @@ class AsyncProcess:
         self._stderr_enabled = stderr not in (None, False)
         self._close_called = False
         self._returncode: bool | None = None
-        if name is None:
-            name = self._args[0].split(os.sep)[-1]
-        self.name = name
-        self.attached_tasks: list[asyncio.Task] = []
 
     @property
     def closed(self) -> bool:
@@ -100,29 +96,17 @@ class AsyncProcess:
 
     async def start(self) -> None:
         """Perform Async init of process."""
-        if self._stdin is True or isinstance(self._stdin, AsyncGenerator):
-            stdin = asyncio.subprocess.PIPE
-        else:
-            stdin = self._stdin
-        if self._stdout is True or isinstance(self._stdout, AsyncGenerator):
-            stdout = asyncio.subprocess.PIPE
-        else:
-            stdout = self._stdout
-        if self._stderr is True or isinstance(self._stderr, AsyncGenerator):
-            stderr = asyncio.subprocess.PIPE
-        else:
-            stderr = self._stderr
         self.proc = await asyncio.create_subprocess_exec(
             *self._args,
-            stdin=stdin if self._stdin_enabled else None,
-            stdout=stdout if self._stdout_enabled else None,
-            stderr=stderr if self._stderr_enabled else None,
+            stdin=asyncio.subprocess.PIPE
+            if (self._stdin is True or isinstance(self._stdin, AsyncGenerator))
+            else self._stdin,
+            stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
+            stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
         )
-        LOGGER.debug("Process %s started with PID %s", self.name, self.proc.pid)
-        if not isinstance(self._stdin, int | None):
+        self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid)
+        if isinstance(self._stdin, AsyncGenerator):
             self.attached_tasks.append(asyncio.create_task(self._feed_stdin()))
-        if self._stderr_logger:
-            self.attached_tasks.append(asyncio.create_task(self._read_stderr()))
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
@@ -213,13 +197,13 @@ class AsyncProcess:
                     if self.returncode is not None:
                         break
             except TimeoutError:
-                LOGGER.debug(
+                self.logger.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
                     self.name,
                     self.proc.pid,
                 )
                 self.proc.terminate()
-        LOGGER.debug(
+        self.logger.debug(
             "Process %s with PID %s stopped with returncode %s",
             self.name,
             self.proc.pid,
@@ -240,25 +224,6 @@ class AsyncProcess:
         self._returncode = self.proc.returncode
         return (stdout, stderr)
 
-    async def iter_stderr(self) -> AsyncGenerator[bytes, None]:
-        """Iterate lines from the stderr stream."""
-        while self.returncode is None:
-            try:
-                line = await self.proc.stderr.readline()
-                if line == b"":
-                    break
-                yield line
-            except ValueError as err:
-                # we're waiting for a line (separator found), but the line was too big
-                # this may happen with ffmpeg during a long (radio) stream where progress
-                # gets outputted to the stderr but no newline
-                # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
-                # NOTE: this consumes the line that was too big
-                if "chunk exceed the limit" in str(err):
-                    continue
-                # raise for all other (value) errors
-                raise
-
     async def _feed_stdin(self) -> None:
         """Feed stdin with chunks from an AsyncGenerator."""
         if TYPE_CHECKING:
@@ -269,24 +234,39 @@ class AsyncProcess:
                     return
                 await self.write(chunk)
             await self.write_eof()
-        except asyncio.CancelledError:
+        except Exception as err:
+            if not isinstance(err, asyncio.CancelledError):
+                self.logger.exception(err)
             # make sure the stdin generator is also properly closed
             # by propagating a cancellederror within
             task = asyncio.create_task(self._stdin.__anext__())
             task.cancel()
 
-    async def _read_stderr(self) -> None:
-        """Read stderr and log to logger."""
-        async for line in self.iter_stderr():
-            line = line.decode().strip()  # noqa: PLW2901
+    async def read_stderr(self) -> bytes:
+        """Read line from stderr."""
+        try:
+            return await self.proc.stderr.readline()
+        except ValueError as err:
+            # we're waiting for a line (separator found), but the line was too big
+            # this may happen with ffmpeg during a long (radio) stream where progress
+            # gets outputted to the stderr but no newline
+            # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
+            # NOTE: this consumes the line that was too big
+            if "chunk exceed the limit" in str(err):
+                return await self.proc.stderr.readline()
+            # raise for all other (value) errors
+            raise
+
+    async def iter_stderr(self) -> AsyncGenerator[str, None]:
+        """Iterate lines from the stderr stream as string."""
+        while True:
+            line = await self.read_stderr()
+            if line == b"":
+                break
+            line = line.decode().strip()
             if not line:
                 continue
-            if "error" in line.lower():
-                self._stderr_logger.error(line)
-            elif "warning" in line.lower():
-                self._stderr_logger.warning(line)
-            else:
-                self._stderr_logger.log(VERBOSE_LOG_LEVEL, line)
+            yield line
 
 
 async def check_output(args: str | list[str]) -> tuple[int, bytes]:
index 5300405c37f3f5a721bc9b3aba58ff8a1485d818..e1b981c84fb7734b5cd9b28156d45bb9a9b4c325 100644 (file)
@@ -251,7 +251,11 @@ class MusicProvider(Provider):
     async def get_audio_stream(  # type: ignore[return]
         self, streamdetails: StreamDetails, seek_position: int = 0
     ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
+        """
+        Return the (custom) audio stream for the provider item.
+
+        Will only be called when the stream_type is set to CUSTOM.
+        """
         raise NotImplementedError
 
     async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
index 92e343af0f831cd31796b6f6600f04fcc6b8531c..107e8d8b2d77284bb9c72d73eddc7f71833d3442 100644 (file)
@@ -265,7 +265,6 @@ class AirplayStream:
             input_format=self.input_format,
             output_format=AIRPLAY_PCM_FORMAT,
             filter_params=get_player_filter_params(self.mass, player_id),
-            loglevel="fatal",
         )
         self._ffmpeg_proc = AsyncProcess(
             ffmpeg_args,
index 967b670e50642e0e02909a15fc453e6fb280a385..e77b5da394b3f9c722729f9e422ec94687cf6fa0 100644 (file)
@@ -26,6 +26,7 @@ from music_assistant.common.models.enums import (
     ImageType,
     MediaType,
     ProviderFeature,
+    StreamType,
 )
 from music_assistant.common.models.errors import LoginFailed
 from music_assistant.common.models.media_items import (
@@ -450,9 +451,9 @@ class DeezerProvider(MusicProvider):  # pylint: disable=W0223
             audio_format=AudioFormat(
                 content_type=ContentType.try_parse(url_details["format"].split("_")[0])
             ),
+            stream_type=StreamType.CUSTOM,
             duration=int(song_data["DURATION"]),
             data={"url": url, "format": url_details["format"]},
-            expires=url_details["exp"],
             size=int(song_data[f"FILESIZE_{url_details['format']}"]),
         )
 
index 5695c40f4846440ec23149fd8fbae4ffbfb3a2e0..ebebc14dea1e38584f9e61418e06182f1b4fb782 100644 (file)
@@ -13,9 +13,7 @@ from aiofiles.os import wrap
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
 from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.errors import SetupFailedError
-from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_PATH
-from music_assistant.server.helpers.audio import get_file_stream
 
 from .base import (
     CONF_ENTRY_MISSING_ALBUM_ARTIST,
@@ -169,14 +167,6 @@ class LocalFileSystemProvider(FileSystemProviderBase):
         abs_path = get_absolute_path(self.base_path, file_path)
         return await exists(abs_path)
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        abs_path = get_absolute_path(self.base_path, streamdetails.item_id)
-        async for chunk in get_file_stream(self.mass, abs_path, streamdetails, seek_position):
-            yield chunk
-
     async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
         """Yield (binary) contents of file in chunks of bytes."""
         abs_path = get_absolute_path(self.base_path, file_path)
index b926837ee2c773cc24b8e6481139a4fbcce43f45..ba57c40aaca2aa41c53e8b237510e080490f2cd1 100644 (file)
@@ -18,7 +18,7 @@ from music_assistant.common.models.config_entries import (
     ConfigEntryType,
     ConfigValueOption,
 )
-from music_assistant.common.models.enums import ExternalID, ProviderFeature
+from music_assistant.common.models.enums import ExternalID, ProviderFeature, StreamType
 from music_assistant.common.models.errors import (
     InvalidDataError,
     MediaNotFoundError,
@@ -613,9 +613,11 @@ class FileSystemProviderBase(MusicProvider):
             item_id=item_id,
             audio_format=prov_mapping.audio_format,
             media_type=MediaType.TRACK,
+            stream_type=StreamType.LOCAL_FILE if file_item.local_path else StreamType.CUSTOM,
             duration=library_item.duration,
             size=file_item.file_size,
-            data=file_item.local_path,
+            data=file_item,
+            path=file_item.local_path,
             can_seek=prov_mapping.audio_format.content_type in SEEKABLE_FILES,
         )
 
index dccc2cd4177e609a2f46f50375666bd45bcad357..b999ad279096fa737b6a6a1b171e046370431ca7 100644 (file)
@@ -26,6 +26,7 @@ from music_assistant.common.models.enums import (
     ImageType,\r
     MediaType,\r
     ProviderFeature,\r
+    StreamType,\r
 )\r
 from music_assistant.common.models.errors import (\r
     InvalidDataError,\r
@@ -52,7 +53,6 @@ from music_assistant.common.models.media_items import Artist as JellyfinArtist
 from music_assistant.common.models.media_items import Playlist as JellyfinPlaylist\r
 from music_assistant.common.models.media_items import Track as JellyfinTrack\r
 from music_assistant.common.models.streamdetails import StreamDetails\r
-from music_assistant.server.helpers.audio import get_http_stream\r
 \r
 if TYPE_CHECKING:\r
     from music_assistant.common.models.provider import ProviderManifest\r
@@ -725,21 +725,25 @@ class JellyfinProvider(MusicProvider):
         jellyfin_track = API.get_item(self._jellyfin_server.jellyfin, item_id)\r
         mimetype = self._media_mime_type(jellyfin_track)\r
         media_stream = jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0]\r
+        url = API.audio_url(\r
+            self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID], SUPPORTED_CONTAINER_FORMATS\r
+        )\r
         if ITEM_KEY_MEDIA_CODEC in media_stream:\r
-            media_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])\r
+            content_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])\r
         else:\r
-            media_type = ContentType.try_parse(mimetype)\r
+            content_type = ContentType.try_parse(mimetype)\r
         return StreamDetails(\r
             item_id=jellyfin_track[ITEM_KEY_ID],\r
             provider=self.instance_id,\r
             audio_format=AudioFormat(\r
-                content_type=media_type,\r
+                content_type=content_type,\r
                 channels=jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CHANNELS],\r
             ),\r
+            stream_type=StreamType.HTTP,\r
             duration=int(\r
                 jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000\r
             ),  # 10000000 ticks per millisecond)\r
-            data=jellyfin_track,\r
+            path=url,\r
         )\r
 \r
     def _get_thumbnail_url(self, client: JellyfinClient, media_item: dict[str, Any]) -> str | None:\r
@@ -808,14 +812,3 @@ class JellyfinProvider(MusicProvider):
         mime_type, _ = mimetypes.guess_type(path)\r
 \r
         return mime_type\r
-\r
-    async def get_audio_stream(\r
-        self, streamdetails: StreamDetails, seek_position: int = 0\r
-    ) -> AsyncGenerator[bytes, None]:\r
-        """Return the audio stream for the provider item."""\r
-        url = API.audio_url(\r
-            self._jellyfin_server.jellyfin, streamdetails.item_id, SUPPORTED_CONTAINER_FORMATS\r
-        )\r
-\r
-        async for chunk in get_http_stream(self.mass, url, streamdetails, seek_position):\r
-            yield chunk\r
index fe34e475b5b7b38ced666d73f9cf09dcdcfc7de5..8796f373cfce571c3c36c9b95b956daa84ed6ba1 100644 (file)
@@ -15,7 +15,13 @@ from libopensonic.errors import (
     SonicError,
 )
 
-from music_assistant.common.models.enums import ContentType, ImageType, MediaType, ProviderFeature
+from music_assistant.common.models.enums import (
+    ContentType,
+    ImageType,
+    MediaType,
+    ProviderFeature,
+    StreamType,
+)
 from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
 from music_assistant.common.models.media_items import (
     Album,
@@ -671,6 +677,7 @@ class OpenSonicProvider(MusicProvider):
             provider=self.instance_id,
             can_seek=self._seek_support,
             audio_format=AudioFormat(content_type=ContentType.try_parse(mime_type)),
+            stream_type=StreamType.CUSTOM,
             duration=sonic_song.duration if sonic_song.duration is not None else 0,
         )
 
index 93367f059fa64e30e74700b15a0bd5cadefeff11..7308553c414b0c73683f4d3ecc935c3027d5fb80 100644 (file)
@@ -28,6 +28,7 @@ from music_assistant.common.models.enums import (
     ImageType,
     MediaType,
     ProviderFeature,
+    StreamType,
 )
 from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
 from music_assistant.common.models.media_items import (
@@ -46,7 +47,6 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_http_stream
 from music_assistant.server.helpers.auth import AuthenticationHelper
 from music_assistant.server.helpers.tags import parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
@@ -755,12 +755,13 @@ class PlexProvider(MusicProvider):
                 content_type=media_type,
                 channels=media.audioChannels,
             ),
+            stream_type=StreamType.HTTP,
             duration=plex_track.duration,
             data=plex_track,
         )
 
         if media_type != ContentType.M4A:
-            stream_details.data = self._plex_server.url(media_part.key, True)
+            stream_details.path = self._plex_server.url(media_part.key, True)
             if audio_stream.samplingRate:
                 stream_details.audio_format.sample_rate = audio_stream.samplingRate
             if audio_stream.bitDepth:
@@ -769,7 +770,7 @@ class PlexProvider(MusicProvider):
         else:
             url = plex_track.getStreamURL()
             media_info = await parse_tags(url)
-
+            stream_details.path = url
             stream_details.audio_format.channels = media_info.channels
             stream_details.audio_format.content_type = ContentType.try_parse(media_info.format)
             stream_details.audio_format.sample_rate = media_info.sample_rate
@@ -777,17 +778,6 @@ class PlexProvider(MusicProvider):
 
         return stream_details
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        if isinstance(streamdetails.data, str):
-            url = streamdetails.data
-        else:
-            url = streamdetails.data.getStreamURL(offset=seek_position)
-        async for chunk in get_http_stream(self.mass, url, streamdetails, 0):
-            yield chunk
-
     async def get_myplex_account_and_refresh_token(self, auth_token: str) -> MyPlexAccount:
         """Get a MyPlexAccount object and refresh the token if needed."""
 
index dbb07534a11716e2b679d449c815ec91350daea8..0e975730db01b5edfabdf1e23c3d38d90021fb67 100644 (file)
@@ -13,7 +13,12 @@ from asyncio_throttle import Throttler
 
 from music_assistant.common.helpers.util import parse_title_and_version, try_parse_int
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ExternalID, ProviderFeature
+from music_assistant.common.models.enums import (
+    ConfigEntryType,
+    ExternalID,
+    ProviderFeature,
+    StreamType,
+)
 from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
 from music_assistant.common.models.media_items import (
     Album,
@@ -43,7 +48,6 @@ from music_assistant.constants import (
 from music_assistant.server.helpers.app_vars import app_var
 
 # pylint: enable=no-name-in-module
-from music_assistant.server.helpers.audio import get_http_stream
 from music_assistant.server.models.music_provider import MusicProvider
 
 if TYPE_CHECKING:
@@ -401,6 +405,7 @@ class QobuzProvider(MusicProvider):
         else:
             msg = f"Unsupported mime type for {item_id}"
             raise MediaNotFoundError(msg)
+        self.mass.create_task(self._report_playback_started(streamdata))
         return StreamDetails(
             item_id=str(item_id),
             provider=self.instance_id,
@@ -409,22 +414,12 @@ class QobuzProvider(MusicProvider):
                 sample_rate=int(streamdata["sampling_rate"] * 1000),
                 bit_depth=streamdata["bit_depth"],
             ),
+            stream_type=StreamType.HTTP,
             duration=streamdata["duration"],
             data=streamdata,  # we need these details for reporting playback
-            expires=time.time() + 300,  # url expires very fast
+            path=streamdata["url"],
         )
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        # report playback started as soon as we start streaming
-        self.mass.create_task(self._report_playback_started(streamdetails.data))
-        async for chunk in get_http_stream(
-            self.mass, streamdetails.data["url"], streamdetails, seek_position
-        ):
-            yield chunk
-
     async def _report_playback_started(self, streamdata: dict) -> None:
         """Report playback start to qobuz."""
         # TODO: need to figure out if the streamed track is purchased by user
index 5141a39749137ad17869cbbf605f6e2a16f09796..7adc431ff04a1b602a42b31333d9736fd5499331 100644 (file)
@@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
 
 from radios import FilterBy, Order, RadioBrowser, RadioBrowserError
 
-from music_assistant.common.models.enums import LinkType, ProviderFeature
+from music_assistant.common.models.enums import LinkType, ProviderFeature, StreamType
 from music_assistant.common.models.media_items import (
     AudioFormat,
     BrowseFolder,
@@ -22,7 +22,6 @@ from music_assistant.common.models.media_items import (
     SearchResults,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_radio_stream
 from music_assistant.server.models.music_provider import MusicProvider
 
 SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
@@ -290,14 +289,7 @@ class RadioBrowserProvider(MusicProvider):
                 content_type=ContentType.try_parse(stream.codec),
             ),
             media_type=MediaType.RADIO,
-            data=stream.url_resolved,
-            expires=time() + 3600,
+            stream_type=StreamType.HTTP,
+            path=stream.url_resolved,
+            can_seek=False,
         )
-
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        # report playback started as soon as we start streaming
-        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
-            yield chunk
index 041b458263767a85bf36c15e9a2669858585853f..e42fd6fd3ca3630d4219159bc36a30b523a77ae6 100644 (file)
@@ -36,7 +36,7 @@ from music_assistant.common.models.errors import SetupFailedError
 from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.constants import UGP_PREFIX
-from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params
+from music_assistant.server.helpers.audio import FFMpeg, get_player_filter_params
 from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -359,24 +359,16 @@ class SnapCastProvider(PlayerProvider):
             stream.set_callback(stream_callback)
             stream_path = f"tcp://{host}:{port}"
             self.logger.debug("Start streaming to %s", stream_path)
-            ffmpeg_args = get_ffmpeg_args(
-                input_format=input_format,
-                output_format=DEFAULT_SNAPCAST_FORMAT,
-                filter_params=get_player_filter_params(self.mass, player_id),
-                output_path=f"tcp://{host}:{port}",
-                loglevel="fatal",
-            )
             try:
-                async with AsyncProcess(
-                    ffmpeg_args,
-                    stdin=True,
-                    stdout=False,
-                    stderr=self.logger.getChild("ffmpeg"),
+                async with FFMpeg(
+                    audio_input=audio_source,
+                    input_format=input_format,
+                    output_format=DEFAULT_SNAPCAST_FORMAT,
+                    filter_params=get_player_filter_params(self.mass, player_id),
                     name="snapcast_ffmpeg",
+                    audio_output=f"tcp://{host}:{port}",
                 ) as ffmpeg_proc:
-                    async for chunk in audio_source:
-                        await ffmpeg_proc.write(chunk)
-                    await ffmpeg_proc.write_eof()
+                    await ffmpeg_proc.wait()
                     # we need to wait a bit for the stream status to become idle
                     # to ensure that all snapclients have consumed the audio
                     await self.mass.players.wait_for_state(player, PlayerState.IDLE)
index 4bfeb783612bcc178034de7cbf2286f4086f4434..20388f4229cdcc223f6728f9c8c84f8ca97005b3 100644 (file)
@@ -535,7 +535,8 @@ class SonosPlayer:
         """Handle callback for topology change event."""
         if xml := event.variables.get("zone_group_state"):
             zgs = ET.fromstring(xml)
-            for vanished_device in zgs.find("VanishedDevices") or []:
+            vanished_devices = zgs.find("VanishedDevices") or []
+            for vanished_device in vanished_devices:
                 if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS:
                     self.logger.debug(
                         "Ignoring %s marked %s as vanished with reason: %s",
index 5d2385f6963c760d4db7cb92a3ddc917d0c9fa34..587172550bada69b618d66f4996fde3c6996b7ae 100644 (file)
@@ -10,7 +10,7 @@ from soundcloudpy import SoundcloudAsyncAPI
 
 from music_assistant.common.helpers.util import parse_title_and_version
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
 from music_assistant.common.models.errors import InvalidDataError, LoginFailed
 from music_assistant.common.models.media_items import (
     Artist,
@@ -26,11 +26,6 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import (
-    get_hls_stream,
-    get_http_stream,
-    resolve_radio_stream,
-)
 from music_assistant.server.models.music_provider import MusicProvider
 
 CONF_CLIENT_ID = "client_id"
@@ -315,25 +310,10 @@ class SoundcloudMusicProvider(MusicProvider):
             audio_format=AudioFormat(
                 content_type=ContentType.try_parse(stream_format),
             ),
+            stream_type=StreamType.HTTP,
             data=url,
         )
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        _, _, is_hls = await resolve_radio_stream(self.mass, streamdetails.data)
-        if is_hls:
-            # some soundcloud streams are HLS, prefer the radio streamer
-            async for chunk in get_hls_stream(self.mass, streamdetails.data, streamdetails):
-                yield chunk
-            return
-        # regular stream from http
-        async for chunk in get_http_stream(
-            self.mass, streamdetails.data, streamdetails, seek_position
-        ):
-            yield chunk
-
     async def _parse_artist(self, artist_obj: dict) -> Artist:
         """Parse a Soundcloud user response to Artist model object."""
         artist_id = None
index 841980f10416fe89e57eba355e18fd8c5c083cfa..80a775ee3b1a3c082f3190488076c91337a8c9b0 100644 (file)
@@ -17,7 +17,12 @@ from asyncio_throttle import Throttler
 from music_assistant.common.helpers.json import json_loads
 from music_assistant.common.helpers.util import parse_title_and_version
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ExternalID, ProviderFeature
+from music_assistant.common.models.enums import (
+    ConfigEntryType,
+    ExternalID,
+    ProviderFeature,
+    StreamType,
+)
 from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
 from music_assistant.common.models.media_items import (
     Album,
@@ -396,18 +401,13 @@ class SpotifyProvider(MusicProvider):
 
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
-        # make sure a valid track is requested.
-        track = await self.get_track(item_id)
         return StreamDetails(
-            item_id=track.item_id,
+            item_id=item_id,
             provider=self.instance_id,
             audio_format=AudioFormat(
                 content_type=ContentType.OGG,
             ),
-            duration=track.duration,
-            # these streamdetails may be cached for a long time,
-            # as there is no time sensitive info in them
-            expires=time.time() + 30 * 24 * 3600,
+            stream_type=StreamType.CUSTOM,
         )
 
     async def get_audio_stream(
@@ -437,16 +437,6 @@ class SpotifyProvider(MusicProvider):
                 yield chunk
                 bytes_sent += len(chunk)
 
-        if bytes_sent == 0 and not self._ap_workaround:
-            # AP resolve failure
-            # https://github.com/librespot-org/librespot/issues/972
-            # retry with ap-port set to invalid value, which will force fallback
-            args += ["--ap-port", "12345"]
-            async with AsyncProcess(args, stdout=True) as librespot_proc:
-                async for chunk in librespot_proc.iter_any():
-                    yield chunk
-            self._ap_workaround = True
-
     async def _parse_artist(self, artist_obj):
         """Parse spotify artist object to generic layout."""
         artist = Artist(
index 57b7b10756e18823f36878e4cba4b54a82ca2429..eea1e5de70837c57513f131f2687a4b4d580a128 100644 (file)
@@ -27,6 +27,7 @@ from music_assistant.common.models.enums import (
     ImageType,
     MediaType,
     ProviderFeature,
+    StreamType,
 )
 from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
 from music_assistant.common.models.media_items import (
@@ -44,7 +45,6 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_http_stream
 from music_assistant.server.helpers.auth import AuthenticationHelper
 from music_assistant.server.helpers.tags import AudioTags, parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
@@ -439,20 +439,11 @@ class TidalProvider(MusicProvider):
                 bit_depth=media_info.bits_per_sample,
                 channels=media_info.channels,
             ),
+            stream_type=StreamType.HTTP,
             duration=track.duration,
-            data=url,
+            path=url,
         )
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        # report playback started as soon as we start streaming
-        async for chunk in get_http_stream(
-            self.mass, streamdetails.data, streamdetails, seek_position
-        ):
-            yield chunk
-
     async def get_artist(self, prov_artist_id: str) -> Artist:
         """Get artist details for given artist id."""
         tidal_session = await self._get_tidal_session()
index af55ab46d7fabed1c85a853edff2569bc1951c6a..408599ec563e048413b46543e51f595a49a17221 100644 (file)
@@ -2,14 +2,13 @@
 
 from __future__ import annotations
 
-from time import time
 from typing import TYPE_CHECKING
 
 from asyncio_throttle import Throttler
 
 from music_assistant.common.helpers.util import create_sort_name
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
 from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
 from music_assistant.common.models.media_items import (
     AudioFormat,
@@ -22,7 +21,6 @@ from music_assistant.common.models.media_items import (
 )
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_USERNAME
-from music_assistant.server.helpers.audio import get_radio_stream
 from music_assistant.server.helpers.tags import parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
 
@@ -227,8 +225,9 @@ class TuneInProvider(MusicProvider):
                     content_type=ContentType.UNKNOWN,
                 ),
                 media_type=MediaType.RADIO,
-                data=item_id,
-                expires=time() + 3600,
+                stream_type=StreamType.HTTP,
+                path=item_id,
+                can_seek=False,
             )
         stream_item_id, media_type = item_id.split("--", 1)
         stream_info = await self.__get_data("Tune.ashx", id=stream_item_id)
@@ -242,20 +241,13 @@ class TuneInProvider(MusicProvider):
                     content_type=ContentType(stream["media_type"]),
                 ),
                 media_type=MediaType.RADIO,
-                data=stream["url"],
-                expires=time() + 3600,
+                stream_type=StreamType.HTTP,
+                path=stream["url"],
+                can_seek=False,
             )
         msg = f"Unable to retrieve stream details for {item_id}"
         raise MediaNotFoundError(msg)
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        # report playback started as soon as we start streaming
-        async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
-            yield chunk
-
     async def __get_data(self, endpoint: str, **kwargs):
         """Get data from api."""
         if endpoint.startswith("http"):
index 9a4d950fddca4a5ddfcd1087ee35a74974769988..4003402bd5fd1af6db5aa78d25fbcf94b9b3c114 100644 (file)
@@ -2,10 +2,9 @@
 
 from __future__ import annotations
 
-import os
 from typing import TYPE_CHECKING
 
-from music_assistant.common.models.enums import ContentType, ImageType, MediaType
+from music_assistant.common.models.enums import ContentType, ImageType, MediaType, StreamType
 from music_assistant.common.models.errors import MediaNotFoundError
 from music_assistant.common.models.media_items import (
     Artist,
@@ -17,18 +16,10 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import (
-    get_file_stream,
-    get_http_stream,
-    get_radio_stream,
-    resolve_radio_stream,
-)
 from music_assistant.server.helpers.tags import AudioTags, parse_tags
 from music_assistant.server.models.music_provider import MusicProvider
 
 if TYPE_CHECKING:
-    from collections.abc import AsyncGenerator
-
     from music_assistant.common.models.config_entries import (
         ConfigEntry,
         ConfigValueType,
@@ -179,8 +170,7 @@ class URLProvider(MusicProvider):
         if cached_info and not force_refresh:
             return AudioTags.parse(cached_info)
         # parse info with ffprobe (and store in cache)
-        resolved_url, _, _ = await resolve_radio_stream(self.mass, url)
-        media_info = await parse_tags(resolved_url)
+        media_info = await parse_tags(url)
         if "authSig" in url:
             media_info.has_cover_image = False
         await self.mass.cache.set(cache_key, media_info.raw)
@@ -199,28 +189,7 @@ class URLProvider(MusicProvider):
                 bit_depth=media_info.bits_per_sample,
             ),
             media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
-            data={"url": item_id},
+            stream_type=StreamType.HTTP,
+            path=item_id,
+            can_seek=not is_radio,
         )
-
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        if streamdetails.media_type == MediaType.RADIO:
-            # radio stream url
-            async for chunk in get_radio_stream(
-                self.mass, streamdetails.data["url"], streamdetails
-            ):
-                yield chunk
-        elif os.path.isfile(streamdetails.data):
-            # local file
-            async for chunk in get_file_stream(
-                self.mass, streamdetails.data["url"], streamdetails, seek_position
-            ):
-                yield chunk
-        else:
-            # regular stream url (without icy meta)
-            async for chunk in get_http_stream(
-                self.mass, streamdetails.data["url"], streamdetails, seek_position
-            ):
-                yield chunk
index 76fc93d263474cdb19c7a6d468816738e1534919..12a635c80eb552a87cddb7286e24277909f97998 100644 (file)
@@ -12,10 +12,9 @@ from typing import TYPE_CHECKING
 from urllib.parse import unquote
 
 import pytube
-from aiohttp import ClientResponseError
 
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
 from music_assistant.common.models.errors import (
     InvalidDataError,
     LoginFailed,
@@ -40,7 +39,6 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.audio import get_http_stream
 from music_assistant.server.helpers.auth import AuthenticationHelper
 from music_assistant.server.models.music_provider import MusicProvider
 
@@ -535,44 +533,15 @@ class YoutubeMusicProvider(MusicProvider):
             audio_format=AudioFormat(
                 content_type=ContentType.try_parse(stream_format["mimeType"]),
             ),
-            data=url,
+            stream_type=StreamType.HTTP,
+            path=url,
         )
-        if (
-            track_obj["streamingData"].get("expiresInSeconds")
-            and track_obj["streamingData"].get("expiresInSeconds").isdigit()
-        ):
-            stream_details.expires = time() + int(
-                track_obj["streamingData"].get("expiresInSeconds")
-            )
-        else:
-            stream_details.expires = time() + 600
         if stream_format.get("audioChannels") and str(stream_format.get("audioChannels")).isdigit():
             stream_details.audio_format.channels = int(stream_format.get("audioChannels"))
         if stream_format.get("audioSampleRate") and stream_format.get("audioSampleRate").isdigit():
             stream_details.audio_format.sample_rate = int(stream_format.get("audioSampleRate"))
         return stream_details
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        is_retry = False
-        while True:
-            try:
-                async for chunk in get_http_stream(
-                    self.mass, streamdetails.data, streamdetails, seek_position
-                ):
-                    yield chunk
-                return
-            except ClientResponseError as err:
-                if not is_retry and err.status == 403:
-                    # cipher expired, get a fresh one
-                    self.logger.warning("Cipher expired, trying to refresh...")
-                    streamdetails = await self.get_stream_details(streamdetails.item_id)
-                    continue
-                # raise for all other cases or we have already retried
-                raise
-
     async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs):
         """Post data to the given endpoint."""
         await self._check_oauth_token()
@@ -908,6 +877,8 @@ class YoutubeMusicProvider(MusicProvider):
             "AUDIO_QUALITY_MEDIUM": 2,
             "AUDIO_QUALITY_HIGH": 3,
         }
+        if "streamingData" not in track_obj:
+            raise MediaNotFoundError("No stream found for this track")
         for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]:
             if adaptive_format["mimeType"].startswith("audio") and (
                 not stream_format
@@ -916,6 +887,5 @@ class YoutubeMusicProvider(MusicProvider):
             ):
                 stream_format = adaptive_format
         if stream_format is None:
-            msg = "No stream found for this track"
-            raise MediaNotFoundError(msg)
+            raise MediaNotFoundError("No stream found for this track")
         return stream_format