Use direct stream whenever possible to minimize overhead (#411)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 13 Jul 2022 23:21:55 +0000 (01:21 +0200)
committerGitHub <noreply@github.com>
Wed, 13 Jul 2022 23:21:55 +0000 (01:21 +0200)
Use direct stream in ffmpeg if supported by source

Minimize overhead by allowing ffmpeg to consume the file/url directly.

music_assistant/helpers/audio.py
music_assistant/models/media_items.py
music_assistant/models/player_queue.py
music_assistant/music_providers/filesystem.py
music_assistant/music_providers/qobuz.py
music_assistant/music_providers/url.py
music_assistant/music_providers/ytmusic/ytmusic.py

index 444b722811679b3fd19eaa09ced372ecb073803c..23abd6419870d059aab7c7d5e2aec1c5b0b5fd17 100644 (file)
@@ -153,10 +153,11 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N
     LOGGER.debug("Start analyzing track %s", streamdetails.uri)
     # calculate BS.1770 R128 integrated loudness with ffmpeg
     started = time()
+    input_file = streamdetails.direct or "-"
     proc_args = [
         "ffmpeg",
         "-i",
-        "-",
+        input_file,
         "-f",
         streamdetails.content_type.value,
         "-af",
@@ -166,7 +167,10 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N
         "-",
     ]
     async with AsyncProcess(
-        proc_args, True, enable_stdout=False, enable_stderr=True
+        proc_args,
+        enable_stdin=streamdetails.direct is None,
+        enable_stdout=False,
+        enable_stderr=True,
     ) as ffmpeg_proc:
 
         async def writer():
@@ -179,9 +183,10 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N
                     break
             ffmpeg_proc.write_eof()
 
-        writer_task = ffmpeg_proc.attach_task(writer())
-        # wait for the writer task to finish
-        await writer_task
+        if streamdetails.direct is None:
+            writer_task = ffmpeg_proc.attach_task(writer())
+            # wait for the writer task to finish
+            await writer_task
 
         _, stderr = await ffmpeg_proc.communicate()
         try:
@@ -363,13 +368,17 @@ async def get_media_stream(
     """Get the PCM audio stream for the given streamdetails."""
     assert pcm_fmt.is_pcm(), "Output format must be a PCM type"
     args = await _get_ffmpeg_args(
-        streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels
+        streamdetails,
+        pcm_fmt,
+        pcm_sample_rate=sample_rate,
+        pcm_channels=channels,
+        seek_position=seek_position,
     )
-    async with AsyncProcess(args, enable_stdin=True) as ffmpeg_proc:
+    async with AsyncProcess(
+        args, enable_stdin=streamdetails.direct is None
+    ) as ffmpeg_proc:
 
-        LOGGER.debug(
-            "start media stream for: %s, using args: %s", streamdetails.uri, str(args)
-        )
+        LOGGER.debug("start media stream for: %s", streamdetails.uri)
 
         async def writer():
             """Task that grabs the source audio and feeds it to ffmpeg."""
@@ -383,7 +392,8 @@ async def get_media_stream(
             ffmpeg_proc.write_eof()
             LOGGER.debug("writer finished for %s", streamdetails.uri)
 
-        ffmpeg_proc.attach_task(writer())
+        if streamdetails.direct is None:
+            ffmpeg_proc.attach_task(writer())
 
         # yield chunks from stdout
         try:
@@ -399,6 +409,9 @@ async def get_media_stream(
                 streamdetails.item_id, streamdetails.provider
             )
         finally:
+            # report playback
+            if streamdetails.callback:
+                mass.create_task(streamdetails.callback, streamdetails)
             # send analyze job to background worker
             if streamdetails.loudness is None:
                 mass.add_job(
@@ -645,6 +658,7 @@ async def _get_ffmpeg_args(
     pcm_output_format: ContentType,
     pcm_sample_rate: int,
     pcm_channels: int = 2,
+    seek_position: int = 0,
 ) -> List[str]:
     """Collect all args to send to the ffmpeg process."""
     input_format = streamdetails.content_type
@@ -665,9 +679,17 @@ async def _get_ffmpeg_args(
         "quiet",
         "-ignore_unknown",
     ]
-    if streamdetails.content_type != ContentType.UNKNOWN:
-        input_args += ["-f", input_format.value]
-    input_args += ["-i", "-"]
+    if streamdetails.direct:
+        # ffmpeg can access the inputfile (or url) directly
+        if seek_position:
+            input_args += ["-ss", str(seek_position)]
+        input_args += ["-i", streamdetails.direct]
+    else:
+        # the input is received from pipe/stdin
+        if streamdetails.content_type != ContentType.UNKNOWN:
+            input_args += ["-f", input_format.value]
+        input_args += ["-i", "-"]
+
     # collect output args
     output_args = [
         "-acodec",
index 56ed62d79a41d4e5d450bdf0c02e1d8dae5abbd9..a73f3c6778d4041f27a7e14c1c936fcc95d9237a 100755 (executable)
@@ -407,6 +407,13 @@ class StreamDetails(DataClassDictMixin):
     expires: float = time() + 3600
     # data: provider specific data (not exposed externally)
     data: Optional[Any] = None
+    # if the url/file is supported by ffmpeg directly, use direct stream
+    direct: Optional[str] = None
+    # callback: optional callback function (or coroutine) to call when the stream completes.
+    # needed for streaming provivders to report what is playing
+    # receives the streamdetails as only argument from which to grab
+    # details such as seconds_streamed.
+    callback: Any = None
 
     # the fields below will be set/controlled by the streamcontroller
     queue_id: Optional[str] = None
@@ -418,8 +425,10 @@ class StreamDetails(DataClassDictMixin):
     def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]:
         """Exclude internal fields from dict."""
         d.pop("data")
+        d.pop("direct")
         d.pop("expires")
         d.pop("queue_id")
+        d.pop("callback")
         return d
 
     def __str__(self):
index 6e9d0403370de626b16bc6c613d976e4fbbccdcb..be9c2b2cc015b855fa3ab1e8e2550197bf7fce93 100644 (file)
@@ -2,6 +2,7 @@
 from __future__ import annotations
 
 import asyncio
+import os
 import pathlib
 import random
 from asyncio import TimerHandle
@@ -38,7 +39,11 @@ RESOURCES_DIR = (
     .parent.resolve()
     .joinpath("helpers/resources")
 )
+
 ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
+if not os.path.isfile(ALERT_ANNOUNCE_FILE):
+    ALERT_ANNOUNCE_FILE = None
+
 FALLBACK_DURATION = 172800  # if duration is None (e.g. radio stream) = 48 hours
 
 
@@ -297,7 +302,7 @@ class PlayerQueue:
                     media_type=MediaType.ANNOUNCEMENT,
                     loudness=0,
                     gain_correct=4,
-                    data=_url,
+                    direct=_url,
                 ),
                 media_type=MediaType.ANNOUNCEMENT,
             )
@@ -338,7 +343,7 @@ class PlayerQueue:
 
             queue_items = []
             # prepend alert sound if needed
-            if prepend_alert:
+            if prepend_alert and ALERT_ANNOUNCE_FILE:
                 queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE))
 
             queue_items.append(create_announcement(url))
index 8b965cbd2adebdbeb538439d5df7e328f7fcce97..00d59a4b53e88ef6e16416358215cf6d6c2c5b23 100644 (file)
@@ -15,7 +15,6 @@ import xmltodict
 from aiofiles.os import wrap
 from aiofiles.threadpool.binary import AsyncFileIO
 
-from music_assistant.helpers.audio import get_file_stream
 from music_assistant.helpers.compare import compare_strings
 from music_assistant.helpers.tags import FALLBACK_ARTIST, parse_tags, split_items
 from music_assistant.helpers.util import create_safe_string, parse_title_and_version
@@ -448,18 +447,9 @@ class FileSystemProvider(MusicProvider):
             size=stat.st_size,
             sample_rate=metadata.sample_rate,
             bit_depth=metadata.bits_per_sample,
-            data=itempath,
+            direct=itempath,
         )
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        async for chunk in get_file_stream(
-            self.mass, streamdetails.data, streamdetails, seek_position
-        ):
-            yield chunk
-
     async def _parse_track(self, track_path: str) -> Track | None:
         """Try to parse a track from a filename by reading its tags."""
 
index 20d560def221aac031bc8e2cf19c2501229b3387..ae6a18e04ef0d0799d2f243068b8abdfe496720c 100644 (file)
@@ -13,7 +13,6 @@ from asyncio_throttle import Throttler
 from music_assistant.helpers.app_vars import (  # pylint: disable=no-name-in-module
     app_var,
 )
-from music_assistant.helpers.audio import get_http_stream
 from music_assistant.helpers.util import parse_title_and_version, try_parse_int
 from music_assistant.models.enums import ProviderType
 from music_assistant.models.errors import LoginFailed, MediaNotFoundError
@@ -334,6 +333,8 @@ class QobuzProvider(MusicProvider):
             content_type = ContentType.FLAC
         else:
             raise MediaNotFoundError(f"Unsupported mime type for {item_id}")
+        # report playback started as soon as the streamdetails are requested
+        self.mass.create_task(self._report_playback_started(item_id, streamdata))
         return StreamDetails(
             item_id=str(item_id),
             provider=self.type,
@@ -343,28 +344,11 @@ class QobuzProvider(MusicProvider):
             bit_depth=streamdata["bit_depth"],
             data=streamdata,  # we need these details for reporting playback
             expires=time.time() + 1800,  # not sure about the real allowed value
+            direct=streamdata["url"],
+            callback=self._report_playback_stopped,
         )
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        self.mass.create_task(self._report_playback_started(streamdetails))
-        bytes_sent = 0
-        try:
-            url = streamdetails.data["url"]
-            async for chunk in get_http_stream(
-                self.mass, url, streamdetails, seek_position
-            ):
-                yield chunk
-                bytes_sent += len(chunk)
-        finally:
-            if bytes_sent:
-                self.mass.create_task(
-                    self._report_playback_stopped(streamdetails, bytes_sent)
-                )
-
-    async def _report_playback_started(self, streamdetails: StreamDetails) -> None:
+    async def _report_playback_started(self, item_id: str, streamdata: dict) -> None:
         """Report playback start to qobuz."""
         # TODO: need to figure out if the streamed track is purchased by user
         # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
@@ -372,7 +356,7 @@ class QobuzProvider(MusicProvider):
         device_id = self._user_auth_info["user"]["device"]["id"]
         credential_id = self._user_auth_info["user"]["credential"]["id"]
         user_id = self._user_auth_info["user"]["id"]
-        format_id = streamdetails.data["format_id"]
+        format_id = streamdata["format_id"]
         timestamp = int(time.time())
         events = [
             {
@@ -380,7 +364,7 @@ class QobuzProvider(MusicProvider):
                 "sample": False,
                 "intent": "stream",
                 "device_id": device_id,
-                "track_id": str(streamdetails.item_id),
+                "track_id": str(item_id),
                 "purchase": False,
                 "date": timestamp,
                 "credential_id": credential_id,
@@ -391,9 +375,7 @@ class QobuzProvider(MusicProvider):
         ]
         await self._post_data("track/reportStreamingStart", data=events)
 
-    async def _report_playback_stopped(
-        self, streamdetails: StreamDetails, bytes_sent: int
-    ) -> None:
+    async def _report_playback_stopped(self, streamdetails: StreamDetails) -> None:
         """Report playback stop to qobuz."""
         user_id = self._user_auth_info["user"]["id"]
         await self._get_data(
index 990d79d5ac143fd4f2baf729ef4c822ffd31ae24..868b6a4f3ef31df33eeead75e887787015ab212e 100644 (file)
@@ -161,7 +161,7 @@ class URLProvider(MusicProvider):
             media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
             sample_rate=media_info.sample_rate,
             bit_depth=media_info.bits_per_sample,
-            data=url,
+            direct=None if is_radio else url,
         )
 
     async def get_audio_stream(
index 3272d05bf296f3608bb8cabbf58ff8276d49caaf..4a8085c582e7ace0c672fe30f30c281e318d6306 100644 (file)
@@ -7,7 +7,6 @@ from urllib.parse import unquote
 import pytube
 import ytmusicapi
 
-from music_assistant.helpers.audio import get_http_stream
 from music_assistant.models.enums import ProviderType
 from music_assistant.models.errors import (
     InvalidDataError,
@@ -245,19 +244,10 @@ class YoutubeMusicProvider(MusicProvider):
         return StreamDetails(
             provider=self.type,
             item_id=item_id,
-            data=url,
             content_type=ContentType.try_parse(stream_format["mimeType"]),
+            direct=url,
         )
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        async for chunk in get_http_stream(
-            self.mass, streamdetails.data, streamdetails, seek_position
-        ):
-            yield chunk
-
     async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
         url = f"{YTM_BASE_URL}{endpoint}"
         data.update(self._context)