From 563f54a4e65ab8d71004824c9e3adb4f004ac1f4 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 14 Jul 2022 01:21:55 +0200 Subject: [PATCH] Use direct stream whenever possible to minimize overhead (#411) Use direct stream in ffmpeg if supported by source Minimize overhead by allowing ffmpeg to consume the file/url directly. --- music_assistant/helpers/audio.py | 50 +++++++++++++------ music_assistant/models/media_items.py | 9 ++++ music_assistant/models/player_queue.py | 9 +++- music_assistant/music_providers/filesystem.py | 12 +---- music_assistant/music_providers/qobuz.py | 34 +++---------- music_assistant/music_providers/url.py | 2 +- .../music_providers/ytmusic/ytmusic.py | 12 +---- 7 files changed, 63 insertions(+), 65 deletions(-) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 444b7228..23abd641 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -153,10 +153,11 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N LOGGER.debug("Start analyzing track %s", streamdetails.uri) # calculate BS.1770 R128 integrated loudness with ffmpeg started = time() + input_file = streamdetails.direct or "-" proc_args = [ "ffmpeg", "-i", - "-", + input_file, "-f", streamdetails.content_type.value, "-af", @@ -166,7 +167,10 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N "-", ] async with AsyncProcess( - proc_args, True, enable_stdout=False, enable_stderr=True + proc_args, + enable_stdin=streamdetails.direct is None, + enable_stdout=False, + enable_stderr=True, ) as ffmpeg_proc: async def writer(): @@ -179,9 +183,10 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N break ffmpeg_proc.write_eof() - writer_task = ffmpeg_proc.attach_task(writer()) - # wait for the writer task to finish - await writer_task + if streamdetails.direct is None: + writer_task = ffmpeg_proc.attach_task(writer()) + # wait for the writer task to finish + await writer_task _, stderr = await ffmpeg_proc.communicate() try: @@ -363,13 +368,17 @@ async def get_media_stream( """Get the PCM audio stream for the given streamdetails.""" assert pcm_fmt.is_pcm(), "Output format must be a PCM type" args = await _get_ffmpeg_args( - streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels + streamdetails, + pcm_fmt, + pcm_sample_rate=sample_rate, + pcm_channels=channels, + seek_position=seek_position, ) - async with AsyncProcess(args, enable_stdin=True) as ffmpeg_proc: + async with AsyncProcess( + args, enable_stdin=streamdetails.direct is None + ) as ffmpeg_proc: - LOGGER.debug( - "start media stream for: %s, using args: %s", streamdetails.uri, str(args) - ) + LOGGER.debug("start media stream for: %s", streamdetails.uri) async def writer(): """Task that grabs the source audio and feeds it to ffmpeg.""" @@ -383,7 +392,8 @@ async def get_media_stream( ffmpeg_proc.write_eof() LOGGER.debug("writer finished for %s", streamdetails.uri) - ffmpeg_proc.attach_task(writer()) + if streamdetails.direct is None: + ffmpeg_proc.attach_task(writer()) # yield chunks from stdout try: @@ -399,6 +409,9 @@ async def get_media_stream( streamdetails.item_id, streamdetails.provider ) finally: + # report playback + if streamdetails.callback: + mass.create_task(streamdetails.callback, streamdetails) # send analyze job to background worker if streamdetails.loudness is None: mass.add_job( @@ -645,6 +658,7 @@ async def _get_ffmpeg_args( pcm_output_format: ContentType, pcm_sample_rate: int, pcm_channels: int = 2, + seek_position: int = 0, ) -> List[str]: """Collect all args to send to the ffmpeg process.""" input_format = streamdetails.content_type @@ -665,9 +679,17 @@ async def _get_ffmpeg_args( "quiet", "-ignore_unknown", ] - if streamdetails.content_type != ContentType.UNKNOWN: - input_args += ["-f", input_format.value] - input_args += ["-i", "-"] + if streamdetails.direct: + # ffmpeg can access the inputfile (or url) directly + if seek_position: + input_args += ["-ss", str(seek_position)] + input_args += ["-i", streamdetails.direct] + else: + # the input is received from pipe/stdin + if streamdetails.content_type != ContentType.UNKNOWN: + input_args += ["-f", input_format.value] + input_args += ["-i", "-"] + # collect output args output_args = [ "-acodec", diff --git a/music_assistant/models/media_items.py b/music_assistant/models/media_items.py index 56ed62d7..a73f3c67 100755 --- a/music_assistant/models/media_items.py +++ b/music_assistant/models/media_items.py @@ -407,6 +407,13 @@ class StreamDetails(DataClassDictMixin): expires: float = time() + 3600 # data: provider specific data (not exposed externally) data: Optional[Any] = None + # if the url/file is supported by ffmpeg directly, use direct stream + direct: Optional[str] = None + # callback: optional callback function (or coroutine) to call when the stream completes. + # needed for streaming provivders to report what is playing + # receives the streamdetails as only argument from which to grab + # details such as seconds_streamed. + callback: Any = None # the fields below will be set/controlled by the streamcontroller queue_id: Optional[str] = None @@ -418,8 +425,10 @@ class StreamDetails(DataClassDictMixin): def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]: """Exclude internal fields from dict.""" d.pop("data") + d.pop("direct") d.pop("expires") d.pop("queue_id") + d.pop("callback") return d def __str__(self): diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 6e9d0403..be9c2b2c 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio +import os import pathlib import random from asyncio import TimerHandle @@ -38,7 +39,11 @@ RESOURCES_DIR = ( .parent.resolve() .joinpath("helpers/resources") ) + ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac")) +if not os.path.isfile(ALERT_ANNOUNCE_FILE): + ALERT_ANNOUNCE_FILE = None + FALLBACK_DURATION = 172800 # if duration is None (e.g. radio stream) = 48 hours @@ -297,7 +302,7 @@ class PlayerQueue: media_type=MediaType.ANNOUNCEMENT, loudness=0, gain_correct=4, - data=_url, + direct=_url, ), media_type=MediaType.ANNOUNCEMENT, ) @@ -338,7 +343,7 @@ class PlayerQueue: queue_items = [] # prepend alert sound if needed - if prepend_alert: + if prepend_alert and ALERT_ANNOUNCE_FILE: queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE)) queue_items.append(create_announcement(url)) diff --git a/music_assistant/music_providers/filesystem.py b/music_assistant/music_providers/filesystem.py index 8b965cbd..00d59a4b 100644 --- a/music_assistant/music_providers/filesystem.py +++ b/music_assistant/music_providers/filesystem.py @@ -15,7 +15,6 @@ import xmltodict from aiofiles.os import wrap from aiofiles.threadpool.binary import AsyncFileIO -from music_assistant.helpers.audio import get_file_stream from music_assistant.helpers.compare import compare_strings from music_assistant.helpers.tags import FALLBACK_ARTIST, parse_tags, split_items from music_assistant.helpers.util import create_safe_string, parse_title_and_version @@ -448,18 +447,9 @@ class FileSystemProvider(MusicProvider): size=stat.st_size, sample_rate=metadata.sample_rate, bit_depth=metadata.bits_per_sample, - data=itempath, + direct=itempath, ) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - async for chunk in get_file_stream( - self.mass, streamdetails.data, streamdetails, seek_position - ): - yield chunk - async def _parse_track(self, track_path: str) -> Track | None: """Try to parse a track from a filename by reading its tags.""" diff --git a/music_assistant/music_providers/qobuz.py b/music_assistant/music_providers/qobuz.py index 20d560de..ae6a18e0 100644 --- a/music_assistant/music_providers/qobuz.py +++ b/music_assistant/music_providers/qobuz.py @@ -13,7 +13,6 @@ from asyncio_throttle import Throttler from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module app_var, ) -from music_assistant.helpers.audio import get_http_stream from music_assistant.helpers.util import parse_title_and_version, try_parse_int from music_assistant.models.enums import ProviderType from music_assistant.models.errors import LoginFailed, MediaNotFoundError @@ -334,6 +333,8 @@ class QobuzProvider(MusicProvider): content_type = ContentType.FLAC else: raise MediaNotFoundError(f"Unsupported mime type for {item_id}") + # report playback started as soon as the streamdetails are requested + self.mass.create_task(self._report_playback_started(item_id, streamdata)) return StreamDetails( item_id=str(item_id), provider=self.type, @@ -343,28 +344,11 @@ class QobuzProvider(MusicProvider): bit_depth=streamdata["bit_depth"], data=streamdata, # we need these details for reporting playback expires=time.time() + 1800, # not sure about the real allowed value + direct=streamdata["url"], + callback=self._report_playback_stopped, ) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - self.mass.create_task(self._report_playback_started(streamdetails)) - bytes_sent = 0 - try: - url = streamdetails.data["url"] - async for chunk in get_http_stream( - self.mass, url, streamdetails, seek_position - ): - yield chunk - bytes_sent += len(chunk) - finally: - if bytes_sent: - self.mass.create_task( - self._report_playback_stopped(streamdetails, bytes_sent) - ) - - async def _report_playback_started(self, streamdetails: StreamDetails) -> None: + async def _report_playback_started(self, item_id: str, streamdata: dict) -> None: """Report playback start to qobuz.""" # TODO: need to figure out if the streamed track is purchased by user # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx @@ -372,7 +356,7 @@ class QobuzProvider(MusicProvider): device_id = self._user_auth_info["user"]["device"]["id"] credential_id = self._user_auth_info["user"]["credential"]["id"] user_id = self._user_auth_info["user"]["id"] - format_id = streamdetails.data["format_id"] + format_id = streamdata["format_id"] timestamp = int(time.time()) events = [ { @@ -380,7 +364,7 @@ class QobuzProvider(MusicProvider): "sample": False, "intent": "stream", "device_id": device_id, - "track_id": str(streamdetails.item_id), + "track_id": str(item_id), "purchase": False, "date": timestamp, "credential_id": credential_id, @@ -391,9 +375,7 @@ class QobuzProvider(MusicProvider): ] await self._post_data("track/reportStreamingStart", data=events) - async def _report_playback_stopped( - self, streamdetails: StreamDetails, bytes_sent: int - ) -> None: + async def _report_playback_stopped(self, streamdetails: StreamDetails) -> None: """Report playback stop to qobuz.""" user_id = self._user_auth_info["user"]["id"] await self._get_data( diff --git a/music_assistant/music_providers/url.py b/music_assistant/music_providers/url.py index 990d79d5..868b6a4f 100644 --- a/music_assistant/music_providers/url.py +++ b/music_assistant/music_providers/url.py @@ -161,7 +161,7 @@ class URLProvider(MusicProvider): media_type=MediaType.RADIO if is_radio else MediaType.TRACK, sample_rate=media_info.sample_rate, bit_depth=media_info.bits_per_sample, - data=url, + direct=None if is_radio else url, ) async def get_audio_stream( diff --git a/music_assistant/music_providers/ytmusic/ytmusic.py b/music_assistant/music_providers/ytmusic/ytmusic.py index 3272d05b..4a8085c5 100644 --- a/music_assistant/music_providers/ytmusic/ytmusic.py +++ b/music_assistant/music_providers/ytmusic/ytmusic.py @@ -7,7 +7,6 @@ from urllib.parse import unquote import pytube import ytmusicapi -from music_assistant.helpers.audio import get_http_stream from music_assistant.models.enums import ProviderType from music_assistant.models.errors import ( InvalidDataError, @@ -245,19 +244,10 @@ class YoutubeMusicProvider(MusicProvider): return StreamDetails( provider=self.type, item_id=item_id, - data=url, content_type=ContentType.try_parse(stream_format["mimeType"]), + direct=url, ) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - async for chunk in get_http_stream( - self.mass, streamdetails.data, streamdetails, seek_position - ): - yield chunk - async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs): url = f"{YTM_BASE_URL}{endpoint}" data.update(self._context) -- 2.34.1