From 756aa9f9fd76228a63fcc4f8594413823da222fd Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 10 Jun 2022 23:29:31 +0200 Subject: [PATCH] Refactor streaming (#361) * refactored audio streaming for better (pre)buffering * Parse ICY metadata from radio stream * fix alert feature * set default volume normalisation target to -14 * use ffmpeg with libsoxr * fix playback on UPNP devices (e.g. Sonos) * Fix buffering issues * Optimize memory and cpu consumption while streaming --- music_assistant/constants.py | 3 + music_assistant/controllers/music/__init__.py | 4 + .../controllers/music/providers/filesystem.py | 19 +- .../controllers/music/providers/qobuz.py | 117 +-- .../controllers/music/providers/spotify.py | 36 +- .../controllers/music/providers/tunein.py | 23 +- .../controllers/music/providers/url.py | 74 ++ music_assistant/controllers/stream.py | 663 -------------- music_assistant/controllers/streams.py | 620 +++++++++++++ music_assistant/helpers/audio.py | 821 +++++++++--------- music_assistant/helpers/process.py | 103 ++- music_assistant/mass.py | 7 +- music_assistant/models/config.py | 2 +- music_assistant/models/enums.py | 28 +- music_assistant/models/media_items.py | 48 +- music_assistant/models/player.py | 30 + music_assistant/models/player_queue.py | 261 +++--- music_assistant/models/provider.py | 10 +- music_assistant/models/queue_item.py | 22 +- music_assistant/models/queue_settings.py | 15 +- 20 files changed, 1482 insertions(+), 1424 deletions(-) create mode 100644 music_assistant/controllers/music/providers/url.py delete mode 100644 music_assistant/controllers/stream.py create mode 100644 music_assistant/controllers/streams.py diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 0d036d30..1e88116e 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -21,3 +21,6 @@ ATTR_CONFIG_ENTRIES = "config_entries" ATTR_UPDATED_AT = "updated_at" ATTR_ACTIVE_QUEUE = "active_queue" ATTR_GROUP_PARENTS = "group_parents" + + +ROOT_LOGGER_NAME = "music_assistant" diff --git a/music_assistant/controllers/music/__init__.py b/music_assistant/controllers/music/__init__.py index 5090a03d..abd47961 100755 --- a/music_assistant/controllers/music/__init__.py +++ b/music_assistant/controllers/music/__init__.py @@ -27,6 +27,8 @@ from .providers.filesystem import FileSystemProvider from .providers.qobuz import QobuzProvider from .providers.spotify import SpotifyProvider from .providers.tunein import TuneInProvider +from .providers.url import PROVIDER_CONFIG as URL_CONFIG +from .providers.url import URLProvider if TYPE_CHECKING: from music_assistant.mass import MusicAssistant @@ -59,6 +61,8 @@ class MusicController: for prov_conf in self.mass.config.providers: prov_cls = PROV_MAP[prov_conf.type] await self._register_provider(prov_cls(self.mass, prov_conf), prov_conf) + # always register url provider + await self._register_provider(URLProvider(self.mass, URL_CONFIG), URL_CONFIG) async def start_sync( self, diff --git a/music_assistant/controllers/music/providers/filesystem.py b/music_assistant/controllers/music/providers/filesystem.py index 904d28a4..e5ad97db 100644 --- a/music_assistant/controllers/music/providers/filesystem.py +++ b/music_assistant/controllers/music/providers/filesystem.py @@ -14,6 +14,7 @@ from aiofiles.os import wrap from aiofiles.threadpool.binary import AsyncFileIO from tinytag.tinytag import TinyTag +from music_assistant.helpers.audio import get_file_stream from music_assistant.helpers.compare import compare_strings from music_assistant.helpers.database import SCHEMA_VERSION from music_assistant.helpers.util import ( @@ -37,7 +38,6 @@ from music_assistant.models.media_items import ( MediaType, Playlist, StreamDetails, - StreamType, Track, ) from music_assistant.models.provider import MusicProvider @@ -435,16 +435,29 @@ class FileSystemProvider(MusicProvider): _, ext = Path(itempath).name.rsplit(".", 1) content_type = CONTENT_TYPE_EXT.get(ext.lower()) + stat = await self.mass.loop.run_in_executor(None, os.stat, itempath) + return StreamDetails( - type=StreamType.FILE, provider=self.type, item_id=item_id, content_type=content_type, - path=itempath, + media_type=MediaType.TRACK, + duration=tags.duration, + size=stat.st_size, sample_rate=tags.samplerate or 44100, bit_depth=16, # TODO: parse bitdepth + data=itempath, ) + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + async for chunk in get_file_stream( + self.mass, streamdetails.data, streamdetails, seek_position + ): + yield chunk + async def _parse_track(self, track_path: str) -> Track | None: """Try to parse a track from a filename by reading its tags.""" diff --git a/music_assistant/controllers/music/providers/qobuz.py b/music_assistant/controllers/music/providers/qobuz.py index 8ced8464..65bbc48a 100644 --- a/music_assistant/controllers/music/providers/qobuz.py +++ b/music_assistant/controllers/music/providers/qobuz.py @@ -13,11 +13,11 @@ from asyncio_throttle import Throttler from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module app_var, ) +from music_assistant.helpers.audio import get_http_stream from music_assistant.helpers.cache import use_cache from music_assistant.helpers.util import parse_title_and_version, try_parse_int -from music_assistant.models.enums import EventType, ProviderType -from music_assistant.models.errors import LoginFailed -from music_assistant.models.event import MassEvent +from music_assistant.models.enums import ProviderType +from music_assistant.models.errors import LoginFailed, MediaNotFoundError from music_assistant.models.media_items import ( Album, AlbumType, @@ -31,7 +31,6 @@ from music_assistant.models.media_items import ( MediaType, Playlist, StreamDetails, - StreamType, Track, ) from music_assistant.models.provider import MusicProvider @@ -61,12 +60,6 @@ class QobuzProvider(MusicProvider): token = await self._auth_token() if not token: raise LoginFailed(f"Login failed for user {self.config.username}") - # subscribe to stream events so we can report playback to Qobuz - self.mass.subscribe( - self.on_stream_event, - (EventType.STREAM_STARTED, EventType.STREAM_ENDED), - id_filter=self.type.value, - ) return True async def search( @@ -338,69 +331,81 @@ class QobuzProvider(MusicProvider): streamdata = result break if not streamdata: - self.logger.error("Unable to retrieve stream details for track %s", item_id) - return None + raise MediaNotFoundError(f"Unable to retrieve stream details for {item_id}") if streamdata["mime_type"] == "audio/mpeg": content_type = ContentType.MPEG elif streamdata["mime_type"] == "audio/flac": content_type = ContentType.FLAC else: - self.logger.error("Unsupported mime type for track %s", item_id) - return None + raise MediaNotFoundError(f"Unsupported mime type for {item_id}") return StreamDetails( - type=StreamType.URL, item_id=str(item_id), provider=self.type, - path=streamdata["url"], content_type=content_type, + duration=streamdata["duration"], sample_rate=int(streamdata["sampling_rate"] * 1000), bit_depth=streamdata["bit_depth"], - details=streamdata, # we need these details for reporting playback + data=streamdata, # we need these details for reporting playback + expires=time.time() + 1800, # not sure about the real allowed value ) - async def on_stream_event(self, event: MassEvent): - """ - Received event from mass. + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + self.mass.create_task(self._report_playback_started(streamdetails)) + bytes_sent = 0 + try: + url = streamdetails.data["url"] + async for chunk in get_http_stream( + self.mass, url, streamdetails, seek_position + ): + yield chunk + bytes_sent += len(chunk) + finally: + if bytes_sent: + self.mass.create_task( + self._report_playback_stopped(streamdetails, bytes_sent) + ) - We use this to report playback start/stop to qobuz. - """ - if not self._user_auth_info: - return + async def _report_playback_started(self, streamdetails: StreamDetails) -> None: + """Report playback start to qobuz.""" # TODO: need to figure out if the streamed track is purchased by user # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx # {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}} - if event.type == EventType.STREAM_STARTED: - # report streaming started to qobuz - device_id = self._user_auth_info["user"]["device"]["id"] - credential_id = self._user_auth_info["user"]["credential"]["id"] - user_id = self._user_auth_info["user"]["id"] - format_id = event.data.details["format_id"] - timestamp = int(time.time()) - events = [ - { - "online": True, - "sample": False, - "intent": "stream", - "device_id": device_id, - "track_id": str(event.data.item_id), - "purchase": False, - "date": timestamp, - "credential_id": credential_id, - "user_id": user_id, - "local": False, - "format_id": format_id, - } - ] - await self._post_data("track/reportStreamingStart", data=events) - elif event.type == EventType.STREAM_ENDED: - # report streaming ended to qobuz - user_id = self._user_auth_info["user"]["id"] - await self._get_data( - "/track/reportStreamingEnd", - user_id=user_id, - track_id=str(event.data.item_id), - duration=try_parse_int(event.data.seconds_played), - ) + device_id = self._user_auth_info["user"]["device"]["id"] + credential_id = self._user_auth_info["user"]["credential"]["id"] + user_id = self._user_auth_info["user"]["id"] + format_id = streamdetails.data["format_id"] + timestamp = int(time.time()) + events = [ + { + "online": True, + "sample": False, + "intent": "stream", + "device_id": device_id, + "track_id": str(streamdetails.item_id), + "purchase": False, + "date": timestamp, + "credential_id": credential_id, + "user_id": user_id, + "local": False, + "format_id": format_id, + } + ] + await self._post_data("track/reportStreamingStart", data=events) + + async def _report_playback_stopped( + self, streamdetails: StreamDetails, bytes_sent: int + ) -> None: + """Report playback stop to qobuz.""" + user_id = self._user_auth_info["user"]["id"] + await self._get_data( + "/track/reportStreamingEnd", + user_id=user_id, + track_id=str(streamdetails.item_id), + duration=try_parse_int(streamdetails.seconds_streamed), + ) async def _parse_artist(self, artist_obj: dict): """Parse qobuz artist object to generic layout.""" diff --git a/music_assistant/controllers/music/providers/spotify.py b/music_assistant/controllers/music/providers/spotify.py index 1ea97381..83d1f85f 100644 --- a/music_assistant/controllers/music/providers/spotify.py +++ b/music_assistant/controllers/music/providers/spotify.py @@ -17,9 +17,10 @@ from music_assistant.helpers.app_vars import ( # noqa # pylint: disable=no-name app_var, ) from music_assistant.helpers.cache import use_cache +from music_assistant.helpers.process import AsyncProcess from music_assistant.helpers.util import parse_title_and_version from music_assistant.models.enums import ProviderType -from music_assistant.models.errors import LoginFailed +from music_assistant.models.errors import LoginFailed, MediaNotFoundError from music_assistant.models.media_items import ( Album, AlbumType, @@ -33,7 +34,6 @@ from music_assistant.models.media_items import ( MediaType, Playlist, StreamDetails, - StreamType, Track, ) from music_assistant.models.provider import MusicProvider @@ -276,21 +276,39 @@ class SpotifyProvider(MusicProvider): # make sure a valid track is requested. track = await self.get_track(item_id) if not track: - return None + raise MediaNotFoundError(f"track {item_id} not found") # make sure that the token is still valid by just requesting it await self.get_token() - librespot = await self.get_librespot_binary() - librespot_exec = f'{librespot} -c "{self._cache_dir}" --pass-through -b 320 --single-track spotify://track:{track.item_id}' return StreamDetails( - type=StreamType.EXECUTABLE, item_id=track.item_id, provider=self.type, - path=librespot_exec, content_type=ContentType.OGG, - sample_rate=44100, - bit_depth=16, + duration=track.duration, ) + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + # make sure that the token is still valid by just requesting it + await self.get_token() + librespot = await self.get_librespot_binary() + args = [ + librespot, + "-c", + self._cache_dir, + "--pass-through", + "-b", + "320", + "--single-track", + f"spotify://track:{streamdetails.item_id}", + ] + if seek_position: + args += ["--start-position", str(int(seek_position))] + async with AsyncProcess(args) as librespot_proc: + async for chunk in librespot_proc.iterate_chunks(): + yield chunk + async def _parse_artist(self, artist_obj): """Parse spotify artist object to generic layout.""" artist = Artist( diff --git a/music_assistant/controllers/music/providers/tunein.py b/music_assistant/controllers/music/providers/tunein.py index 97691966..fe6ef60a 100644 --- a/music_assistant/controllers/music/providers/tunein.py +++ b/music_assistant/controllers/music/providers/tunein.py @@ -5,10 +5,11 @@ from typing import AsyncGenerator, List, Optional from asyncio_throttle import Throttler +from music_assistant.helpers.audio import get_radio_stream from music_assistant.helpers.cache import use_cache from music_assistant.helpers.util import create_clean_string from music_assistant.models.enums import ProviderType -from music_assistant.models.errors import LoginFailed +from music_assistant.models.errors import LoginFailed, MediaNotFoundError from music_assistant.models.media_items import ( ContentType, ImageType, @@ -19,7 +20,6 @@ from music_assistant.models.media_items import ( MediaType, Radio, StreamDetails, - StreamType, ) from music_assistant.models.provider import MusicProvider @@ -154,17 +154,22 @@ class TuneInProvider(MusicProvider): for stream in stream_info["body"]: if stream["media_type"] == media_type: return StreamDetails( - type=StreamType.URL, - item_id=item_id, provider=self.type, - path=stream["url"], + item_id=item_id, content_type=ContentType(stream["media_type"]), - sample_rate=44100, - bit_depth=16, media_type=MediaType.RADIO, - details=stream, + data=stream, ) - return None + raise MediaNotFoundError(f"Unable to retrieve stream details for {item_id}") + + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + async for chunk in get_radio_stream( + self.mass, streamdetails.data["url"], streamdetails + ): + yield chunk @use_cache(3600 * 2) async def __get_data(self, endpoint: str, **kwargs): diff --git a/music_assistant/controllers/music/providers/url.py b/music_assistant/controllers/music/providers/url.py new file mode 100644 index 00000000..1dbb50f8 --- /dev/null +++ b/music_assistant/controllers/music/providers/url.py @@ -0,0 +1,74 @@ +"""Basic provider allowing for external URL's to be streamed.""" +from __future__ import annotations + +import os +from typing import AsyncGenerator, List, Optional + +from music_assistant.helpers.audio import ( + get_file_stream, + get_http_stream, + get_radio_stream, +) +from music_assistant.models.config import MusicProviderConfig +from music_assistant.models.enums import ContentType, MediaType, ProviderType +from music_assistant.models.media_items import MediaItemType, StreamDetails +from music_assistant.models.provider import MusicProvider + +PROVIDER_CONFIG = MusicProviderConfig(ProviderType.URL) + + +class URLProvider(MusicProvider): + """Music Provider for manual URL's/files added to the queue.""" + + _attr_name: str = "URL" + _attr_type: ProviderType = ProviderType.URL + _attr_available: bool = True + _attr_supported_mediatypes: List[MediaType] = [] + + async def setup(self) -> bool: + """ + Handle async initialization of the provider. + + Called when provider is registered. + """ + return True + + async def search( + self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 + ) -> List[MediaItemType]: + """Perform search on musicprovider.""" + return [] + + async def get_stream_details(self, item_id: str) -> StreamDetails | None: + """Get streamdetails for a track/radio.""" + url = item_id + return StreamDetails( + provider=ProviderType.URL, + item_id=item_id, + content_type=ContentType.try_parse(url), + media_type=MediaType.URL, + data=url, + ) + + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + if streamdetails.media_type == MediaType.RADIO: + # radio stream url + async for chunk in get_radio_stream( + self.mass, streamdetails.data, streamdetails + ): + yield chunk + elif os.path.isfile(streamdetails.data): + # local file + async for chunk in get_file_stream( + self.mass, streamdetails.data, streamdetails, seek_position + ): + yield chunk + else: + # regular stream url (without icy meta and reconnect) + async for chunk in get_http_stream( + self.mass, streamdetails.data, streamdetails, seek_position + ): + yield chunk diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py deleted file mode 100644 index 12031eca..00000000 --- a/music_assistant/controllers/stream.py +++ /dev/null @@ -1,663 +0,0 @@ -"""Controller to stream audio to players.""" -from __future__ import annotations - -import asyncio -import urllib.parse -from asyncio import Task -from time import time -from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Set - -from aiohttp import web - -from music_assistant.helpers.audio import ( - check_audio_support, - create_wave_header, - crossfade_pcm_parts, - fadein_pcm_part, - get_media_stream, - get_preview_stream, - get_sox_args_for_pcm_stream, - get_stream_details, - strip_silence, -) -from music_assistant.helpers.process import AsyncProcess -from music_assistant.models.enums import ( - ContentType, - CrossFadeMode, - EventType, - MediaType, - ProviderType, -) -from music_assistant.models.errors import ( - MediaNotFoundError, - MusicAssistantError, - QueueEmpty, -) -from music_assistant.models.event import MassEvent -from music_assistant.models.player_queue import PlayerQueue, QueueItem - -if TYPE_CHECKING: - from music_assistant.mass import MusicAssistant - - -class StreamController: - """Controller to stream audio to players.""" - - def __init__(self, mass: MusicAssistant): - """Initialize instance.""" - self.mass = mass - self.logger = mass.logger.getChild("stream") - self._port = mass.config.stream_port - self._ip = mass.config.stream_ip - self._subscribers: Dict[str, Set[str]] = {} - self._client_queues: Dict[str, Dict[str, asyncio.Queue]] = {} - self._stream_tasks: Dict[str, Task] = {} - self._time_started: Dict[str, float] = {} - - def get_stream_url( - self, - queue_id: str, - child_player: Optional[str] = None, - content_type: ContentType = ContentType.FLAC, - ) -> str: - """Return the full stream url for the PlayerQueue Stream.""" - ext = content_type.value - if child_player: - return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{ext}" - return f"http://{self._ip}:{self._port}/{queue_id}.{ext}" - - async def get_preview_url(self, provider: ProviderType, track_id: str) -> str: - """Return url to short preview sample.""" - track = await self.mass.music.tracks.get_provider_item(track_id, provider) - if preview := track.metadata.preview: - return preview - enc_track_id = urllib.parse.quote(track_id) - return f"http://{self._ip}:{self._port}/preview?provider_id={provider.value}&item_id={enc_track_id}" - - def get_silence_url(self, duration: int = 600) -> str: - """Return url to silence.""" - return f"http://{self._ip}:{self._port}/silence?duration={duration}" - - async def setup(self) -> None: - """Async initialize of module.""" - app = web.Application() - - app.router.add_get("/preview", self.serve_preview) - app.router.add_get("/silence", self.serve_silence) - app.router.add_get( - "/{queue_id}/{player_id}.{format}", - self.serve_multi_client_queue_stream, - ) - app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream) - app.router.add_get("/{queue_id}", self.serve_queue_stream) - - runner = web.AppRunner(app, access_log=None) - await runner.setup() - # set host to None to bind to all addresses on both IPv4 and IPv6 - http_site = web.TCPSite(runner, host=None, port=self._port) - await http_site.start() - - async def on_shutdown_event(*event: MassEvent): - """Handle shutdown event.""" - await http_site.stop() - await runner.cleanup() - await app.shutdown() - await app.cleanup() - self.logger.info("Streamserver exited.") - - self.mass.subscribe(on_shutdown_event, EventType.SHUTDOWN) - - sox_present, ffmpeg_present = await check_audio_support(True) - if not ffmpeg_present and not sox_present: - self.logger.error( - "SoX or FFmpeg binary not found on your system, " - "playback will NOT work!." - ) - elif not ffmpeg_present: - self.logger.warning( - "The FFmpeg binary was not found on your system, " - "you might experience issues with playback. " - "Please install FFmpeg with your OS package manager.", - ) - elif not sox_present: - self.logger.warning( - "The SoX binary was not found on your system, FFmpeg is used as fallback." - ) - - self.logger.info("Started stream server on port %s", self._port) - - @staticmethod - async def serve_silence(request: web.Request): - """Serve silence.""" - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": "audio/wav"} - ) - await resp.prepare(request) - duration = int(request.query.get("duration", 600)) - await resp.write(create_wave_header(duration=duration)) - for _ in range(0, duration): - await resp.write(b"\0" * 1764000) - return resp - - async def serve_preview(self, request: web.Request): - """Serve short preview sample.""" - provider_id = request.query["provider_id"] - item_id = urllib.parse.unquote(request.query["item_id"]) - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": "audio/mp3"} - ) - await resp.prepare(request) - async for _, chunk in get_preview_stream(self.mass, provider_id, item_id): - await resp.write(chunk) - return resp - - async def serve_queue_stream(self, request: web.Request): - """Serve queue audio stream to a single player (encoded to fileformat of choice).""" - queue_id = request.match_info["queue_id"] - fmt = request.match_info.get("format", "flac") - queue = self.mass.players.get_player_queue(queue_id) - - if queue is None: - return web.Response(status=404) - - # prepare request - try: - start_streamdetails = await queue.queue_stream_prepare() - except QueueEmpty: - # send stop here to prevent the player from retrying over and over - await queue.stop() - # send some silence to allow the player to process the stop request - return await self.serve_silence(request) - - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"} - ) - await resp.prepare(request) - - output_fmt = ContentType(fmt) - # work out sample rate - if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS: - sample_rate = min(96000, queue.max_sample_rate) - bit_depth = 24 - channels = 2 - resample = True - else: - sample_rate = start_streamdetails.sample_rate - bit_depth = start_streamdetails.bit_depth - channels = start_streamdetails.channels - resample = False - sox_args = await get_sox_args_for_pcm_stream( - sample_rate, - bit_depth, - channels, - output_format=output_fmt, - ) - # get the raw pcm bytes from the queue stream and on the fly encode to wanted format - # send the compressed/encoded stream to the client. - async with AsyncProcess(sox_args, True) as sox_proc: - - async def writer(): - # task that sends the raw pcm audio to the sox/ffmpeg process - async for audio_chunk in self._get_queue_stream( - queue, - sample_rate=sample_rate, - bit_depth=bit_depth, - channels=channels, - resample=resample, - ): - if sox_proc.closed: - return - await sox_proc.write(audio_chunk) - # write eof when last packet is received - sox_proc.write_eof() - - sox_proc.attach_task(writer()) - - # read bytes from final output - async for audio_chunk in sox_proc.iterate_chunks(): - await resp.write(audio_chunk) - - return resp - - async def serve_multi_client_queue_stream(self, request: web.Request): - """Serve queue audio stream to multiple (group)clients.""" - queue_id = request.match_info["queue_id"] - player_id = request.match_info["player_id"] - fmt = request.match_info.get("format", "flac") - queue = self.mass.players.get_player_queue(queue_id) - player = self.mass.players.get_player(player_id) - - if queue is None or player is None: - return web.Response(status=404) - - # prepare request - resp = web.StreamResponse( - status=200, - reason="OK", - headers={"Content-Type": f"audio/{fmt}"}, - ) - await resp.prepare(request) - - # start delivering audio chunks - await self.subscribe_client(queue_id, player_id) - try: - while True: - client_queue = self._client_queues.get(queue_id).get(player_id) - if not client_queue: - break - audio_chunk = await client_queue.get() - if audio_chunk == b"": - # eof - break - await resp.write(audio_chunk) - client_queue.task_done() - finally: - await self.unsubscribe_client(queue_id, player_id) - return resp - - async def subscribe_client(self, queue_id: str, player_id: str) -> None: - """Subscribe client to queue stream.""" - if queue_id not in self._stream_tasks: - raise MusicAssistantError(f"No Queue stream available for {queue_id}") - - if queue_id not in self._subscribers: - self._subscribers[queue_id] = set() - self._subscribers[queue_id].add(player_id) - - self.logger.debug( - "Subscribed player %s to multi queue stream %s", - player_id, - queue_id, - ) - - async def unsubscribe_client(self, queue_id: str, player_id: str): - """Unsubscribe client from queue stream.""" - if player_id in self._subscribers[queue_id]: - self._subscribers[queue_id].remove(player_id) - - self.__cleanup_client_queue(queue_id, player_id) - self.logger.debug( - "Unsubscribed player %s from multi queue stream %s", player_id, queue_id - ) - if len(self._subscribers[queue_id]) == 0: - # no more clients, cancel stream task - if task := self._stream_tasks.pop(queue_id, None): - self.logger.debug( - "Aborted multi queue stream %s due to no more clients", queue_id - ) - task.cancel() - - async def start_multi_client_queue_stream( - self, queue_id: str, expected_clients: Set[str], output_fmt: ContentType - ) -> None: - """Start the Queue stream feeding callbacks of listeners..""" - assert queue_id not in self._stream_tasks, "already running!" - - self._time_started[queue_id] = time() - - # create queue for expected clients - self._client_queues.setdefault(queue_id, {}) - for child_id in expected_clients: - self._client_queues[queue_id][child_id] = asyncio.Queue(10) - - self._stream_tasks[queue_id] = asyncio.create_task( - self.__multi_client_queue_stream_runner(queue_id, output_fmt) - ) - - async def stop_multi_client_queue_stream(self, queue_id: str) -> None: - """Signal a running queue stream task and its listeners to stop.""" - if queue_id not in self._stream_tasks: - return - - # send stop to child players - await asyncio.gather( - *[ - self.mass.players.get_player(client_id).stop() - for client_id in self._subscribers.get(queue_id, {}) - ] - ) - - # stop background task - if stream_task := self._stream_tasks.pop(queue_id, None): - stream_task.cancel() - - # wait for cleanup - while len(self._subscribers.get(queue_id, {})) != 0: - await asyncio.sleep(0.1) - while len(self._client_queues.get(queue_id, {})) != 0: - await asyncio.sleep(0.1) - - async def __multi_client_queue_stream_runner( - self, queue_id: str, output_fmt: ContentType - ): - """Distribute audio chunks over connected clients in a multi client queue stream.""" - queue = self.mass.players.get_player_queue(queue_id) - - start_streamdetails = await queue.queue_stream_prepare() - # work out sample rate - if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS: - sample_rate = min(96000, queue.max_sample_rate) - bit_depth = 24 - channels = 2 - resample = True - else: - sample_rate = start_streamdetails.sample_rate - bit_depth = start_streamdetails.bit_depth - channels = start_streamdetails.channels - resample = False - sox_args = await get_sox_args_for_pcm_stream( - sample_rate, - bit_depth, - channels, - output_format=output_fmt, - ) - self.logger.debug("Multi client queue stream %s started", queue.queue_id) - try: - - # get the raw pcm bytes from the queue stream and on the fly encode to wanted format - # send the compressed/endoded stream to the client. - async with AsyncProcess(sox_args, True) as sox_proc: - - async def writer(): - """Task that sends the raw pcm audio to the sox/ffmpeg process.""" - async for audio_chunk in self._get_queue_stream( - queue, - sample_rate=sample_rate, - bit_depth=bit_depth, - channels=channels, - resample=resample, - ): - if sox_proc.closed: - return - await sox_proc.write(audio_chunk) - # write eof when last packet is received - sox_proc.write_eof() - - async def reader(): - """Read bytes from final output and put chunk on child queues.""" - chunks_sent = 0 - async for chunk in sox_proc.iterate_chunks(): - chunks_sent += 1 - coros = [] - for player_id in list(self._client_queues[queue_id].keys()): - if ( - self._client_queues[queue_id][player_id].full() - and chunks_sent >= 10 - and player_id not in self._subscribers[queue_id] - ): - # assume client did not connect at all or got disconnected somehow - self.__cleanup_client_queue(queue_id, player_id) - self._client_queues[queue_id].pop(player_id, None) - else: - coros.append( - self._client_queues[queue_id][player_id].put(chunk) - ) - await asyncio.gather(*coros) - - # launch the reader and writer - await asyncio.gather(*[writer(), reader()]) - # wait for all queues to consume their data - await asyncio.gather( - *[cq.join() for cq in self._client_queues[queue_id].values()] - ) - # send empty chunk to inform EOF - await asyncio.gather( - *[cq.put(b"") for cq in self._client_queues[queue_id].values()] - ) - - finally: - self.logger.debug("Multi client queue stream %s finished", queue.queue_id) - # cleanup - self._stream_tasks.pop(queue_id, None) - for player_id in list(self._client_queues[queue_id].keys()): - self.__cleanup_client_queue(queue_id, player_id) - - self.logger.debug("Multi client queue stream %s ended", queue.queue_id) - - def __cleanup_client_queue(self, queue_id: str, player_id: str): - """Cleanup a client queue after it completes/disconnects.""" - if client_queue := self._client_queues.get(queue_id, {}).pop(player_id, None): - for _ in range(client_queue.qsize()): - client_queue.get_nowait() - client_queue.task_done() - client_queue.put_nowait(b"") - - async def _get_queue_stream( - self, - queue: PlayerQueue, - sample_rate: int, - bit_depth: int, - channels: int = 2, - resample: bool = False, - ) -> AsyncGenerator[None, bytes]: - """Stream the PlayerQueue's tracks as constant feed of PCM raw audio.""" - bytes_written_total = 0 - last_fadeout_data = b"" - queue_index = None - track_count = 0 - prev_track: Optional[QueueItem] = None - - pcm_fmt = ContentType.from_bit_depth(bit_depth) - self.logger.info( - "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)", - queue.player.name, - pcm_fmt.value, - sample_rate, - ) - - # stream queue tracks one by one - while True: - # get the (next) track in queue - track_count += 1 - if track_count == 1: - # report start of queue playback so we can calculate current track/duration etc. - queue_index, seek_position, fade_in = await queue.queue_stream_start() - else: - queue_index = await queue.queue_stream_next(queue_index) - seek_position = 0 - fade_in = 0 - queue_track = queue.get_item(queue_index) - if not queue_track: - self.logger.debug( - "Abort Queue stream %s: no (more) tracks in queue", queue.queue_id - ) - break - # get streamdetails - try: - streamdetails = await get_stream_details( - self.mass, queue_track, queue.queue_id - ) - except MediaNotFoundError as err: - self.logger.warning( - "Skip track %s due to missing streamdetails", - queue_track.name, - exc_info=err, - ) - continue - - # check the PCM samplerate/bitrate - if not resample and streamdetails.bit_depth > bit_depth: - await queue.queue_stream_signal_next() - self.logger.debug( - "Abort queue stream %s due to bit depth mismatch", queue.player.name - ) - break - if ( - not resample - and streamdetails.sample_rate > sample_rate - and streamdetails.sample_rate <= queue.max_sample_rate - ): - self.logger.debug( - "Abort queue stream %s due to sample rate mismatch", - queue.player.name, - ) - await queue.queue_stream_signal_next() - break - - # check crossfade ability - use_crossfade = queue.settings.crossfade_mode != CrossFadeMode.DISABLED - if ( - prev_track is not None - and prev_track.media_type == MediaType.TRACK - and queue_track.media_type == MediaType.TRACK - ): - prev_item = await self.mass.music.get_item_by_uri(prev_track.uri) - new_item = await self.mass.music.get_item_by_uri(queue_track.uri) - if ( - prev_item.album is not None - and new_item.album is not None - and prev_item.album == new_item.album - ): - use_crossfade = False - prev_track = queue_track - - sample_size = int(sample_rate * (bit_depth / 8) * channels) # 1 second - buffer_size = sample_size * (queue.settings.crossfade_duration or 2) - # force small buffer for radio to prevent too much lag at start - if queue_track.media_type != MediaType.TRACK: - use_crossfade = False - buffer_size = sample_size - - self.logger.info( - "Start Streaming queue track: %s (%s) for queue %s", - queue_track.uri, - queue_track.name, - queue.player.name, - ) - queue_track.streamdetails.seconds_skipped = seek_position - fade_in_part = b"" - cur_chunk = 0 - prev_chunk = None - bytes_written = 0 - # handle incoming audio chunks - async for is_last_chunk, chunk in get_media_stream( - self.mass, - streamdetails, - pcm_fmt, - resample=sample_rate, - chunk_size=buffer_size, - seek_position=seek_position, - ): - cur_chunk += 1 - - # HANDLE FIRST PART OF TRACK - if not chunk and bytes_written == 0 and is_last_chunk: - # stream error: got empy first chunk ?! - self.logger.warning("Stream error on %s", queue_track.uri) - elif cur_chunk == 1 and last_fadeout_data: - prev_chunk = chunk - del chunk - elif cur_chunk == 1 and fade_in: - # fadein first chunk - fadein_first_part = await fadein_pcm_part( - chunk, fade_in, pcm_fmt, sample_rate - ) - yield fadein_first_part - bytes_written += len(fadein_first_part) - del chunk - del fadein_first_part - elif cur_chunk <= 2 and not last_fadeout_data: - # no fadeout_part available so just pass it to the output directly - yield chunk - bytes_written += len(chunk) - del chunk - # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN - elif cur_chunk == 2 and last_fadeout_data: - # combine the first 2 chunks and strip off silence - first_part = await strip_silence( - prev_chunk + chunk, pcm_fmt, sample_rate - ) - if len(first_part) < buffer_size: - # part is too short after the strip action?! - # so we just use the full first part - first_part = prev_chunk + chunk - fade_in_part = first_part[:buffer_size] - remaining_bytes = first_part[buffer_size:] - del first_part - # do crossfade - crossfade_part = await crossfade_pcm_parts( - fade_in_part, - last_fadeout_data, - queue.settings.crossfade_duration, - pcm_fmt, - sample_rate, - ) - # send crossfade_part - yield crossfade_part - bytes_written += len(crossfade_part) - del crossfade_part - del fade_in_part - last_fadeout_data = b"" - # also write the leftover bytes from the strip action - yield remaining_bytes - bytes_written += len(remaining_bytes) - del remaining_bytes - del chunk - prev_chunk = None # needed to prevent this chunk being sent again - # HANDLE LAST PART OF TRACK - elif prev_chunk and is_last_chunk: - # last chunk received so create the last_part - # with the previous chunk and this chunk - # and strip off silence - last_part = await strip_silence( - prev_chunk + chunk, pcm_fmt, sample_rate, True - ) - if len(last_part) < buffer_size: - # part is too short after the strip action - # so we just use the entire original data - last_part = prev_chunk + chunk - if not use_crossfade or len(last_part) < buffer_size: - # crossfading is not enabled or not enough data, - # so just pass the (stripped) audio data - if use_crossfade: - self.logger.warning( - "Not enough data for crossfade: %s", len(last_part) - ) - yield last_part - bytes_written += len(last_part) - del last_part - del chunk - else: - # handle crossfading support - # store fade section to be picked up for next track - last_fadeout_data = last_part[-buffer_size:] - remaining_bytes = last_part[:-buffer_size] - # write remaining bytes - if remaining_bytes: - yield remaining_bytes - bytes_written += len(remaining_bytes) - del last_part - del remaining_bytes - del chunk - # MIDDLE PARTS OF TRACK - else: - # middle part of the track - # keep previous chunk in memory so we have enough - # samples to perform the crossfade - if prev_chunk: - yield prev_chunk - bytes_written += len(prev_chunk) - prev_chunk = chunk - else: - prev_chunk = chunk - del chunk - # allow clients to only buffer max ~10 seconds ahead - queue_track.streamdetails.seconds_played = bytes_written / sample_size - seconds_buffered = (bytes_written_total + bytes_written) / sample_size - seconds_needed = queue.player.elapsed_time + 10 - diff = seconds_buffered - seconds_needed - track_time = queue_track.duration or 0 - if track_time > 10 and diff > 1: - await asyncio.sleep(diff) - # end of the track reached - bytes_written_total += bytes_written - self.logger.debug( - "Finished Streaming queue track: %s (%s) on queue %s", - queue_track.uri, - queue_track.name, - queue.player.name, - ) - # end of queue reached, pass last fadeout bits to final output - yield last_fadeout_data - # END OF QUEUE STREAM - self.logger.info("Queue stream for Queue %s finished.", queue.player.name) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py new file mode 100644 index 00000000..96fad465 --- /dev/null +++ b/music_assistant/controllers/streams.py @@ -0,0 +1,620 @@ +"""Controller to stream audio to players.""" +from __future__ import annotations + +import asyncio +import gc +import urllib.parse +from types import CoroutineType +from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional +from uuid import uuid4 + +from aiohttp import web + +from music_assistant.helpers.audio import ( + check_audio_support, + create_wave_header, + crossfade_pcm_parts, + fadein_pcm_part, + get_chunksize, + get_ffmpeg_args_for_pcm_stream, + get_media_stream, + get_preview_stream, + get_stream_details, + strip_silence, +) +from music_assistant.helpers.process import AsyncProcess +from music_assistant.models.enums import ( + ContentType, + CrossFadeMode, + EventType, + MediaType, + ProviderType, +) +from music_assistant.models.errors import MediaNotFoundError, QueueEmpty +from music_assistant.models.event import MassEvent +from music_assistant.models.player_queue import PlayerQueue +from music_assistant.models.queue_item import QueueItem + +if TYPE_CHECKING: + from music_assistant.mass import MusicAssistant + + +class StreamsController: + """Controller to stream audio to players.""" + + def __init__(self, mass: MusicAssistant): + """Initialize instance.""" + self.mass = mass + self.logger = mass.logger.getChild("stream") + self._port = mass.config.stream_port + self._ip = mass.config.stream_ip + self.queue_streams: Dict[str, QueueStream] = {} + + @property + def base_url(self) -> str: + """Return the base url for the stream engine.""" + return f"http://{self._ip}:{self._port}" + + def get_stream_url( + self, + stream_id: str, + content_type: ContentType = ContentType.FLAC, + ) -> str: + """Generate unique stream url for the PlayerQueue Stream.""" + ext = content_type.value + return f"{self.base_url}/{stream_id}.{ext}" + + async def get_preview_url(self, provider: ProviderType, track_id: str) -> str: + """Return url to short preview sample.""" + track = await self.mass.music.tracks.get_provider_item(track_id, provider) + if preview := track.metadata.preview: + return preview + enc_track_id = urllib.parse.quote(track_id) + return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}" + + def get_silence_url(self, duration: int = 600) -> str: + """Return url to silence.""" + return f"{self.base_url}/silence?duration={duration}" + + async def setup(self) -> None: + """Async initialize of module.""" + app = web.Application() + + app.router.add_get("/preview", self.serve_preview) + app.router.add_get("/silence", self.serve_silence) + app.router.add_get("/{stream_id}.{format}", self.serve_queue_stream) + + runner = web.AppRunner(app, access_log=None) + await runner.setup() + # set host to None to bind to all addresses on both IPv4 and IPv6 + http_site = web.TCPSite(runner, host=None, port=self._port) + await http_site.start() + + async def on_shutdown_event(*event: MassEvent): + """Handle shutdown event.""" + await http_site.stop() + await runner.cleanup() + await app.shutdown() + await app.cleanup() + self.logger.info("Streamserver exited.") + + self.mass.subscribe(on_shutdown_event, EventType.SHUTDOWN) + + ffmpeg_present, libsoxr_support = await check_audio_support(True) + if not ffmpeg_present: + self.logger.error( + "FFmpeg binary not found on your system, " "playback will NOT work!." + ) + elif not libsoxr_support: + self.logger.warning( + "FFmpeg version found without libsoxr support, " + "highest quality audio not available. " + ) + + self.logger.info("Started stream server on port %s", self._port) + + @staticmethod + async def serve_silence(request: web.Request): + """Serve silence.""" + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": "audio/wav"} + ) + await resp.prepare(request) + duration = int(request.query.get("duration", 600)) + await resp.write(create_wave_header(duration=duration)) + for _ in range(0, duration): + await resp.write(b"\0" * 1764000) + return resp + + async def serve_preview(self, request: web.Request): + """Serve short preview sample.""" + provider_id = request.query["provider_id"] + item_id = urllib.parse.unquote(request.query["item_id"]) + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": "audio/mp3"} + ) + await resp.prepare(request) + async for chunk in get_preview_stream(self.mass, provider_id, item_id): + await resp.write(chunk) + return resp + + async def serve_queue_stream(self, request: web.Request): + """Serve queue audio stream to a single player.""" + self.logger.info(request) + self.logger.info(request.headers) + + stream_id = request.match_info["stream_id"] + queue_stream = self.queue_streams.get(stream_id) + + if queue_stream is None: + self.logger.warning("Got stream request for unknown id: %s", stream_id) + return web.Response(status=404) + + # prepare request, add some DLNA/UPNP compatible headers + headers = { + "Content-Type": f"audio/{queue_stream.output_format.value}", + "transferMode.dlna.org": "Streaming", + "Connection": "Close", + "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000", + } + resp = web.StreamResponse(headers=headers) + await resp.prepare(request) + client_id = request.remote + await queue_stream.subscribe(client_id, resp.write) + + return resp + + async def start_queue_stream( + self, + queue: PlayerQueue, + expected_clients: int, + start_index: int, + seek_position: int, + fade_in: bool, + output_format: ContentType, + ) -> QueueStream: + """Start running a queue stream.""" + # cleanup stale previous queue tasks + + # generate unique stream url + stream_id = uuid4().hex + # determine the pcm details based on the first track we need to stream + try: + first_item = queue.items[start_index] + except (IndexError, TypeError) as err: + raise QueueEmpty() from err + + streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id) + + # work out pcm details + if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS: + pcm_sample_rate = min(96000, queue.max_sample_rate) + pcm_bit_depth = 24 + pcm_channels = 2 + pcm_resample = True + elif streamdetails.sample_rate > queue.max_sample_rate: + pcm_sample_rate = queue.max_sample_rate + pcm_bit_depth = streamdetails.bit_depth + pcm_channels = streamdetails.channels + pcm_resample = True + else: + pcm_sample_rate = streamdetails.sample_rate + pcm_bit_depth = streamdetails.bit_depth + pcm_channels = streamdetails.channels + pcm_resample = False + + self.queue_streams[stream_id] = stream = QueueStream( + queue=queue, + stream_id=stream_id, + expected_clients=expected_clients, + start_index=start_index, + seek_position=seek_position, + fade_in=fade_in, + output_format=output_format, + pcm_sample_rate=pcm_sample_rate, + pcm_bit_depth=pcm_bit_depth, + pcm_channels=pcm_channels, + pcm_resample=pcm_resample, + autostart=True, + ) + self.mass.create_task(self.cleanup_stale) + return stream + + def cleanup_stale(self) -> None: + """Cleanup stale/done stream tasks.""" + stale = set() + for stream_id, stream in self.queue_streams.items(): + if stream.done.is_set() and not stream.connected_clients: + stale.add(stream_id) + for stream_id in stale: + self.queue_streams.pop(stream_id, None) + + +class QueueStream: + """Representation of a (multisubscriber) Audio Queue stream.""" + + def __init__( + self, + queue: PlayerQueue, + stream_id: str, + expected_clients: int, + start_index: int, + seek_position: int, + fade_in: bool, + output_format: ContentType, + pcm_sample_rate: int, + pcm_bit_depth: int, + pcm_channels: int = 2, + pcm_floating_point: bool = False, + pcm_resample: bool = False, + autostart: bool = False, + ): + """Init QueueStreamJob instance.""" + self.queue = queue + self.stream_id = stream_id + self.expected_clients = expected_clients + self.start_index = start_index + self.seek_position = seek_position + self.fade_in = fade_in + self.output_format = output_format + self.pcm_sample_rate = pcm_sample_rate + self.pcm_bit_depth = pcm_bit_depth + self.pcm_channels = pcm_channels + self.pcm_floating_point = pcm_floating_point + self.pcm_resample = pcm_resample + self.url = queue.mass.streams.get_stream_url(stream_id, output_format) + + self.mass = queue.mass + self.logger = self.queue.logger.getChild("stream") + self.expected_clients = expected_clients + self.connected_clients: Dict[str, CoroutineType[bytes]] = {} + self._runner_task: Optional[asyncio.Task] = None + self.done = asyncio.Event() + self.all_clients_connected = asyncio.Event() + self.index_in_buffer = start_index + self.signal_next: bool = False + if autostart: + self.mass.create_task(self.start()) + + async def start(self) -> None: + """Start running queue stream.""" + self._runner_task = self.mass.create_task(self._queue_stream_runner()) + + async def stop(self) -> None: + """Stop running queue stream and cleanup.""" + self.done.set() + if self._runner_task and not self._runner_task.done(): + self._runner_task.cancel() + # allow some time to cleanup + await asyncio.sleep(2) + + self._runner_task = None + self.connected_clients = {} + + # run garbage collection manually due to the high number of + # processed bytes blocks + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, gc.collect) + self.logger.debug("Stream job %s cleaned up", self.stream_id) + + async def subscribe(self, client_id: str, callback: CoroutineType[bytes]) -> None: + """Subscribe callback and wait for completion.""" + assert client_id not in self.connected_clients, "Client is already connected" + assert not self.done.is_set(), "Stream task is already finished" + self.connected_clients[client_id] = callback + self.logger.debug("client connected: %s", client_id) + if len(self.connected_clients) == self.expected_clients: + self.all_clients_connected.set() + try: + await self.done.wait() + finally: + self.connected_clients.pop(client_id, None) + self.logger.debug("client disconnected: %s", client_id) + if len(self.connected_clients) == 0: + # no more clients, perform cleanup + await self.stop() + + async def _queue_stream_runner(self) -> None: + """Distribute audio chunks over connected client queues.""" + ffmpeg_args = await get_ffmpeg_args_for_pcm_stream( + self.pcm_sample_rate, + self.pcm_bit_depth, + self.pcm_channels, + output_format=self.output_format, + ) + # get the raw pcm bytes from the queue stream and on the fly encode to wanted format + # send the compressed/encoded stream to the client(s). + chunk_size = get_chunksize(self.output_format) + async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc: + + async def writer(): + """Task that sends the raw pcm audio to the ffmpeg process.""" + async for audio_chunk in self._get_queue_stream(): + await ffmpeg_proc.write(audio_chunk) + del audio_chunk + # write eof when last packet is received + ffmpeg_proc.write_eof() + + ffmpeg_proc.attach_task(writer()) + + # wait max 5 seconds for all client(s) to connect + try: + await asyncio.wait_for(self.all_clients_connected.wait(), 5) + except asyncio.exceptions.TimeoutError: + self.logger.warning( + "Abort: client(s) did not connect within 5 seconds." + ) + self.done.set() + return + self.logger.debug("%s clients connected", len(self.connected_clients)) + + # Read bytes from final output and send chunk to child callback. + async for chunk in ffmpeg_proc.iterate_chunks(): + if len(self.connected_clients) == 0: + # no more clients + self.done.set() + self.logger.debug("Abort: all clients diconnected.") + return + for client_id in set(self.connected_clients.keys()): + try: + callback = self.connected_clients[client_id] + await callback(chunk) + except ( + ConnectionResetError, + KeyError, + BrokenPipeError, + ): + self.connected_clients.pop(client_id, None) + + del chunk + + # complete queue streamed + if self.signal_next: + # the queue stream was aborted (e.g. because of sample rate mismatch) + # tell the queue to load the next track (restart stream) as soon + # as the player finished playing and returns to idle + self.queue.signal_next = True + + # all queue data has been streamed. Either because the queue is exhausted + # or we need to restart the stream due to decoder/sample rate mismatch + # set event that this stream task is finished + # if the stream is restarted by the queue manager afterwards is controlled + # by the `signal_next` bool above. + self.done.set() + + async def _get_queue_stream( + self, + ) -> AsyncGenerator[None, bytes]: + """Stream the PlayerQueue's tracks as constant feed of PCM raw audio.""" + last_fadeout_data = b"" + queue_index = None + track_count = 0 + prev_track: Optional[QueueItem] = None + + pcm_fmt = ContentType.from_bit_depth(self.pcm_bit_depth) + self.logger.debug( + "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)", + self.queue.player.name, + pcm_fmt.value, + self.pcm_sample_rate, + ) + + # stream queue tracks one by one + while True: + # get the (next) track in queue + track_count += 1 + if track_count == 1: + queue_index = self.start_index + seek_position = self.seek_position + fade_in = self.fade_in + else: + queue_index = self.queue.get_next_index(queue_index) + seek_position = 0 + fade_in = False + self.index_in_buffer = queue_index + # send signal that we've loaded a new track into the buffer + self.queue.signal_update() + queue_track = self.queue.get_item(queue_index) + if not queue_track: + self.logger.debug( + "Abort Queue stream %s: no (more) tracks in queue", + self.queue.queue_id, + ) + break + # get streamdetails + try: + streamdetails = await get_stream_details( + self.mass, queue_track, self.queue.queue_id + ) + except MediaNotFoundError as err: + self.logger.warning( + "Skip track %s due to missing streamdetails", + queue_track.name, + exc_info=err, + ) + continue + + if queue_track.name == "alert": + self.pcm_resample = True + + # check the PCM samplerate/bitrate + if not self.pcm_resample and streamdetails.bit_depth > self.pcm_bit_depth: + self.signal_next = True + self.logger.debug( + "Abort queue stream %s due to bit depth mismatch", + self.queue.player.name, + ) + break + if ( + not self.pcm_resample + and streamdetails.sample_rate > self.pcm_sample_rate + and streamdetails.sample_rate <= self.queue.max_sample_rate + ): + self.logger.debug( + "Abort queue stream %s due to sample rate mismatch", + self.queue.player.name, + ) + self.signal_next = True + break + + # check crossfade ability + use_crossfade = self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED + if ( + prev_track is not None + and prev_track.media_type == MediaType.TRACK + and queue_track.media_type == MediaType.TRACK + ): + prev_item = await self.mass.music.get_item_by_uri(prev_track.uri) + new_item = await self.mass.music.get_item_by_uri(queue_track.uri) + if ( + prev_item.album is not None + and new_item.album is not None + and prev_item.album == new_item.album + ): + use_crossfade = False + prev_track = queue_track + + sample_size = int( + self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels + ) # 1 second + buffer_size = sample_size * (self.queue.settings.crossfade_duration or 2) + # force small buffer for radio to prevent too much lag at start + if queue_track.media_type != MediaType.TRACK: + use_crossfade = False + buffer_size = sample_size + + self.logger.info( + "Start Streaming queue track: %s (%s) for queue %s", + queue_track.uri, + queue_track.name, + self.queue.player.name, + ) + queue_track.streamdetails.seconds_skipped = seek_position + fade_in_part = b"" + cur_chunk = 0 + prev_chunk = None + bytes_written = 0 + # handle incoming audio chunks + async for chunk in get_media_stream( + self.mass, + streamdetails, + pcm_fmt, + pcm_sample_rate=self.pcm_sample_rate, + chunk_size=buffer_size, + seek_position=seek_position, + ): + cur_chunk += 1 + is_last_chunk = len(chunk) < buffer_size + + # HANDLE FIRST PART OF TRACK + if len(chunk) == 0 and bytes_written == 0 and is_last_chunk: + # stream error: got empy first chunk ?! + self.logger.warning("Stream error on %s", queue_track.uri) + elif cur_chunk == 1 and last_fadeout_data: + prev_chunk = chunk + del chunk + elif cur_chunk == 1 and fade_in: + # fadein first chunk + fadein_first_part = await fadein_pcm_part( + chunk, fade_in, pcm_fmt, self.pcm_sample_rate + ) + yield fadein_first_part + bytes_written += len(fadein_first_part) + del chunk + del fadein_first_part + elif cur_chunk <= 2 and not last_fadeout_data: + # no fadeout_part available so just pass it to the output directly + yield chunk + bytes_written += len(chunk) + del chunk + # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN + elif cur_chunk == 2 and last_fadeout_data: + # combine the first 2 chunks and strip off silence + first_part = await strip_silence( + prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate + ) + if len(first_part) < buffer_size: + # part is too short after the strip action?! + # so we just use the full first part + first_part = prev_chunk + chunk + fade_in_part = first_part[:buffer_size] + remaining_bytes = first_part[buffer_size:] + del first_part + # do crossfade + crossfade_part = await crossfade_pcm_parts( + fade_in_part, + last_fadeout_data, + self.queue.settings.crossfade_duration, + pcm_fmt, + self.pcm_sample_rate, + ) + # send crossfade_part + yield crossfade_part + bytes_written += len(crossfade_part) + del crossfade_part + del fade_in_part + last_fadeout_data = b"" + # also write the leftover bytes from the strip action + yield remaining_bytes + bytes_written += len(remaining_bytes) + del remaining_bytes + del chunk + prev_chunk = None # needed to prevent this chunk being sent again + # HANDLE LAST PART OF TRACK + elif prev_chunk and is_last_chunk: + # last chunk received so create the last_part + # with the previous chunk and this chunk + # and strip off silence + last_part = await strip_silence( + prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, True + ) + if len(last_part) < buffer_size: + # part is too short after the strip action + # so we just use the entire original data + last_part = prev_chunk + chunk + if not use_crossfade or len(last_part) < buffer_size: + if use_crossfade: + self.logger.debug("not enough data for crossfade") + # crossfading is not enabled or not enough data, + # so just pass the (stripped) audio data + yield last_part + bytes_written += len(last_part) + del last_part + del chunk + else: + # handle crossfading support + # store fade section to be picked up for next track + last_fadeout_data = last_part[-buffer_size:] + remaining_bytes = last_part[:-buffer_size] + # write remaining bytes + if remaining_bytes: + yield remaining_bytes + bytes_written += len(remaining_bytes) + del last_part + del remaining_bytes + del chunk + # MIDDLE PARTS OF TRACK + else: + # middle part of the track + # keep previous chunk in memory so we have enough + # samples to perform the crossfade + if prev_chunk: + yield prev_chunk + bytes_written += len(prev_chunk) + prev_chunk = chunk + else: + prev_chunk = chunk + del chunk + # end of the track reached + queue_track.streamdetails.seconds_streamed = bytes_written / sample_size + self.logger.debug( + "Finished Streaming queue track: %s (%s) on queue %s", + queue_track.uri, + queue_track.name, + self.queue.player.name, + ) + # end of queue reached, pass last fadeout bits to final output + yield last_fadeout_data + del last_fadeout_data + # END OF QUEUE STREAM + self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index d12d5afd..41adbeb5 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -3,29 +3,30 @@ from __future__ import annotations import asyncio import logging +import os +import re import struct from io import BytesIO +from time import time from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple import aiofiles from music_assistant.helpers.process import AsyncProcess, check_output from music_assistant.helpers.util import create_tempfile -from music_assistant.models.enums import EventType, ProviderType -from music_assistant.models.errors import AudioError, MediaNotFoundError -from music_assistant.models.event import MassEvent -from music_assistant.models.media_items import ( - ContentType, - MediaType, - StreamDetails, - StreamType, +from music_assistant.models.enums import ProviderType +from music_assistant.models.errors import ( + AudioError, + MediaNotFoundError, + MusicAssistantError, ) +from music_assistant.models.media_items import ContentType, MediaType, StreamDetails if TYPE_CHECKING: from music_assistant.mass import MusicAssistant from music_assistant.models.player_queue import QueueItem -LOGGER = logging.getLogger("audio") +LOGGER = logging.getLogger(__name__) # pylint:disable=consider-using-f-string @@ -37,60 +38,36 @@ async def crossfade_pcm_parts( fmt: ContentType, sample_rate: int, ) -> bytes: - """Crossfade two chunks of pcm/raw audio using sox.""" - _, ffmpeg_present = await check_audio_support() - - # prefer ffmpeg implementation (due to simplicity) - if ffmpeg_present: - fadeoutfile = create_tempfile() - async with aiofiles.open(fadeoutfile.name, "wb") as outfile: - await outfile.write(fade_out_part) - # input args - args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] - args += [ - "-f", - fmt.value, - "-ac", - "2", - "-ar", - str(sample_rate), - "-i", - fadeoutfile.name, - ] - args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"] - # filter args - args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"] - # output args - args += ["-f", fmt.value, "-"] - async with AsyncProcess(args, True) as proc: - crossfade_data, _ = await proc.communicate(fade_in_part) - return crossfade_data - - # sox based implementation - sox_args = [fmt.sox_format(), "-c", "2", "-r", str(sample_rate)] - # create fade-in part - fadeinfile = create_tempfile() - args = ["sox", "--ignore-length", "-t"] + sox_args - args += ["-", "-t"] + sox_args + [fadeinfile.name, "fade", "t", str(fade_length)] - async with AsyncProcess(args, enable_write=True) as sox_proc: - await sox_proc.communicate(fade_in_part) - # create fade-out part + """Crossfade two chunks of pcm/raw audio using ffmpeg.""" fadeoutfile = create_tempfile() - args = ["sox", "--ignore-length", "-t"] + sox_args + ["-", "-t"] + sox_args - args += [fadeoutfile.name, "reverse", "fade", "t", str(fade_length), "reverse"] - async with AsyncProcess(args, enable_write=True) as sox_proc: - await sox_proc.communicate(fade_out_part) - # create crossfade using sox and some temp files - # TODO: figure out how to make this less complex and without the tempfiles - args = ["sox", "-m", "-v", "1.0", "-t"] + sox_args + [fadeoutfile.name, "-v", "1.0"] - args += ["-t"] + sox_args + [fadeinfile.name, "-t"] + sox_args + ["-"] - async with AsyncProcess(args, enable_write=False) as sox_proc: - crossfade_part, _ = await sox_proc.communicate() - fadeinfile.close() - fadeoutfile.close() - del fadeinfile - del fadeoutfile - return crossfade_part + async with aiofiles.open(fadeoutfile.name, "wb") as outfile: + await outfile.write(fade_out_part) + # input args + args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] + args += [ + "-f", + fmt.value, + "-ac", + "2", + "-ar", + str(sample_rate), + "-i", + fadeoutfile.name, + ] + args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"] + # filter args + args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"] + # output args + args += ["-f", fmt.value, "-"] + async with AsyncProcess(args, True) as proc: + crossfade_data, _ = await proc.communicate(fade_in_part) + LOGGER.debug( + "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s", + len(fade_in_part), + len(fade_out_part), + len(crossfade_data), + ) + return crossfade_data async def fadein_pcm_part( @@ -125,34 +102,24 @@ async def strip_silence( audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False ) -> bytes: """Strip silence from (a chunk of) pcm audio.""" - _, ffmpeg_present = await check_audio_support() - # prefer ffmpeg implementation - if ffmpeg_present: - # input args - args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] - args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"] - # filter args - if reverse: - args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"] - else: - args += ["-af", "silenceremove=1:0:-50dB:detection=peak"] - # output args - args += ["-f", fmt.value, "-"] - async with AsyncProcess(args, True) as proc: - stripped_data, _ = await proc.communicate(audio_data) - return stripped_data - - # sox implementation - sox_args = [fmt.sox_format(), "-c", "2", "-r", str(sample_rate)] - args = ["sox", "--ignore-length", "-t"] + sox_args + ["-", "-t"] + sox_args + ["-"] - if reverse: - args.append("reverse") - args += ["silence", "1", "0.1", "1%"] + # input args + args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] + args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"] + # filter args if reverse: - args.append("reverse") - async with AsyncProcess(args, enable_write=True) as sox_proc: - stripped_data, _ = await sox_proc.communicate(audio_data) - return stripped_data + args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"] + else: + args += ["-af", "silenceremove=1:0:-50dB:detection=peak"] + # output args + args += ["-f", fmt.value, "-"] + async with AsyncProcess(args, True) as proc: + stripped_data, _ = await proc.communicate(audio_data) + LOGGER.debug( + "stripped silence of pcm chunk. size before: %s - after: %s", + len(audio_data), + len(stripped_data), + ) + return stripped_data async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> None: @@ -162,57 +129,62 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N # only when needed we do the analyze job return - LOGGER.debug( - "Start analyzing track %s/%s", - streamdetails.provider.value, - streamdetails.item_id, - ) + LOGGER.debug("Start analyzing track %s", streamdetails.uri) # calculate BS.1770 R128 integrated loudness with ffmpeg - if streamdetails.type == StreamType.EXECUTABLE: - proc_args = ( - "%s | ffmpeg -i pipe: -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'" - % streamdetails.path - ) - else: - proc_args = ( - "ffmpeg -i '%s' -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'" - % streamdetails.path - ) - audio_data = b"" - if streamdetails.media_type == MediaType.RADIO: - proc_args = "ffmpeg -i pipe: -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'" - # for radio we collect ~10 minutes of audio data to process - async with mass.http_session.get(streamdetails.path) as response: - async for chunk, _ in response.content.iter_chunks(): - audio_data += chunk - if len(audio_data) >= 20000: + started = time() + proc_args = [ + "ffmpeg", + "-f", + streamdetails.content_type.value, + "-i", + "-", + "-af", + "ebur128=framelog=verbose", + "-f", + "null", + "-", + ] + async with AsyncProcess(proc_args, True, use_stderr=True) as ffmpeg_proc: + + async def writer(): + """Task that grabs the source audio and feeds it to ffmpeg.""" + music_prov = mass.music.get_provider(streamdetails.provider) + async for audio_chunk in music_prov.get_audio_stream(streamdetails): + await ffmpeg_proc.write(audio_chunk) + if (time() - started) > 300: + # just in case of endless radio stream etc break + ffmpeg_proc.write_eof() - proc = await asyncio.create_subprocess_shell( - proc_args, - stdout=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE if audio_data else None, - ) - stdout, _ = await proc.communicate(audio_data or None) - try: - loudness = float(stdout.decode().strip()) - except (ValueError, AttributeError): - LOGGER.warning( - "Could not determine integrated loudness of %s/%s - %s", - streamdetails.provider.value, - streamdetails.item_id, - stdout.decode() or "received empty value", - ) - else: - await mass.music.set_track_loudness( - streamdetails.item_id, streamdetails.provider, loudness - ) - LOGGER.debug( - "Integrated loudness of %s/%s is: %s", - streamdetails.provider.value, - streamdetails.item_id, - loudness, - ) + writer_task = ffmpeg_proc.attach_task(writer()) + # wait for the writer task to finish + await writer_task + + _, stderr = await ffmpeg_proc.communicate() + try: + loudness_str = ( + stderr.decode() + .split("Integrated loudness")[1] + .split("I:")[1] + .split("LUFS")[0] + ) + loudness = float(loudness_str.strip()) + except (ValueError, AttributeError): + LOGGER.warning( + "Could not determine integrated loudness of %s - %s", + streamdetails.uri, + stderr.decode() or "received empty value", + ) + else: + streamdetails.loudness = loudness + await mass.music.set_track_loudness( + streamdetails.item_id, streamdetails.provider, loudness + ) + LOGGER.debug( + "Integrated loudness of %s is: %s", + streamdetails.uri, + loudness, + ) async def get_stream_details( @@ -226,16 +198,17 @@ async def get_stream_details( param media_item: The MediaItem (track/radio) for which to request the streamdetails for. param queue_id: Optionally provide the queue_id which will play this stream. """ - if not queue_item.media_item: - # special case: a plain url was added to the queue - streamdetails = StreamDetails( - type=StreamType.URL, - provider=ProviderType.URL, - item_id=queue_item.item_id, - path=queue_item.uri, - content_type=ContentType.try_parse(queue_item.uri), - ) + if queue_item.streamdetails and (time() < queue_item.streamdetails.expires): + # we already have fresh streamdetails, use these + queue_item.streamdetails.seconds_skipped = 0 + queue_item.streamdetails.seconds_streamed = 0 + streamdetails = queue_item.streamdetails + elif queue_item.media_type == MediaType.URL: + # handle URL provider items + url_prov = mass.music.get_provider(ProviderType.URL) + streamdetails = await url_prov.get_stream_details(queue_item.uri) else: + # media item: fetch streamdetails from provider # always request the full db track as there might be other qualities available full_item = await mass.music.get_item_by_uri(queue_item.uri) # sort by quality and check track availability @@ -248,29 +221,28 @@ async def get_stream_details( music_prov = mass.music.get_provider(prov_media.prov_id) if not music_prov or not music_prov.available: continue # provider temporary unavailable ? - - streamdetails: StreamDetails = await music_prov.get_stream_details( - prov_media.item_id - ) - if streamdetails: - try: - streamdetails.content_type = ContentType(streamdetails.content_type) - except KeyError: - LOGGER.warning("Invalid content type!") - else: - break + try: + streamdetails: StreamDetails = await music_prov.get_stream_details( + prov_media.item_id + ) + streamdetails.content_type = ContentType(streamdetails.content_type) + except MusicAssistantError as err: + LOGGER.warning(str(err)) + else: + break if not streamdetails: raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}") - # set player_id on the streamdetails so we know what players stream + # set queue_id on the streamdetails so we know what is being streamed streamdetails.queue_id = queue_id # get gain correct / replaygain - loudness, gain_correct = await get_gain_correct( - mass, queue_id, streamdetails.item_id, streamdetails.provider - ) - streamdetails.gain_correct = gain_correct - streamdetails.loudness = loudness + if not streamdetails.gain_correct: + loudness, gain_correct = await get_gain_correct(mass, streamdetails) + streamdetails.gain_correct = gain_correct + streamdetails.loudness = loudness + if not streamdetails.duration: + streamdetails.duration = queue_item.duration # set streamdetails as attribute on the media_item # this way the app knows what content is playing queue_item.streamdetails = streamdetails @@ -278,17 +250,23 @@ async def get_stream_details( async def get_gain_correct( - mass: MusicAssistant, queue_id: str, item_id: str, provider: ProviderType + mass: MusicAssistant, streamdetails: StreamDetails ) -> Tuple[float, float]: """Get gain correction for given queue / track combination.""" - queue = mass.players.get_player_queue(queue_id) + queue = mass.players.get_player_queue(streamdetails.queue_id) if not queue or not queue.settings.volume_normalization_enabled: return (0, 0) + if streamdetails.gain_correct is not None: + return (streamdetails.loudness, streamdetails.gain_correct) target_gain = queue.settings.volume_normalization_target - track_loudness = await mass.music.get_track_loudness(item_id, provider) + track_loudness = await mass.music.get_track_loudness( + streamdetails.item_id, streamdetails.provider + ) if track_loudness is None: # fallback to provider average - fallback_track_loudness = await mass.music.get_provider_loudness(provider) + fallback_track_loudness = await mass.music.get_provider_loudness( + streamdetails.provider + ) if fallback_track_loudness is None: # fallback to some (hopefully sane) average value for now fallback_track_loudness = -8.5 @@ -353,218 +331,249 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration= return file.getvalue() -async def get_sox_args( +async def get_ffmpeg_args( streamdetails: StreamDetails, output_format: Optional[ContentType] = None, - resample: Optional[int] = None, - seek_position: Optional[int] = None, + pcm_sample_rate: Optional[int] = None, + pcm_channels: int = 2, ) -> List[str]: - """Collect all args to send to the sox (or ffmpeg) process.""" - stream_path = streamdetails.path - stream_type = StreamType(streamdetails.type) + """Collect all args to send to the ffmpeg process.""" input_format = streamdetails.content_type if output_format is None: output_format = input_format - sox_present, ffmpeg_present = await check_audio_support() - use_ffmpeg = not sox_present or not input_format.sox_supported() or seek_position + ffmpeg_present, libsoxr_support = await check_audio_support() - # use ffmpeg if content not supported by SoX (e.g. AAC radio streams) - if use_ffmpeg: - if not ffmpeg_present: - raise AudioError( - "FFmpeg binary is missing from system." - "Please install ffmpeg on your OS to enable playback.", - ) - # collect input args - if stream_type == StreamType.EXECUTABLE: - # stream from executable - input_args = [ - stream_path, - "|", - "ffmpeg", - "-hide_banner", - "-loglevel", - "error", - "-f", - input_format.value, - "-i", - "-", - ] - else: - input_args = [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "error", - "-i", - stream_path, - ] - if seek_position: - input_args += ["-ss", str(seek_position)] - # collect output args - if output_format.is_pcm(): - output_args = [ - "-f", - output_format.value, - "-c:a", - output_format.name.lower(), - "-", - ] - else: - output_args = ["-f", output_format.value, "-"] - # collect filter args - filter_args = [] - if streamdetails.gain_correct: - filter_args += ["-filter:a", f"volume={streamdetails.gain_correct}dB"] - if resample or input_format.is_pcm(): - filter_args += ["-ar", str(resample)] - return input_args + filter_args + output_args - - # Prefer SoX for all other (=highest quality) - if stream_type == StreamType.EXECUTABLE: - # stream from executable - input_args = [ - stream_path, - "|", - "sox", - "-t", - input_format.sox_format(), - ] - if input_format.is_pcm(): - input_args += [ - "-r", - str(streamdetails.sample_rate), - "-c", - str(streamdetails.channels), - ] - input_args.append("-") - else: - input_args = ["sox", "-t", input_format.sox_format(), stream_path] + if not ffmpeg_present: + raise AudioError( + "FFmpeg binary is missing from system." + "Please install ffmpeg on your OS to enable playback.", + ) + # collect input args + input_args = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-ignore_unknown", + ] + if streamdetails.content_type != ContentType.UNKNOWN: + input_args += ["-f", input_format.value] + input_args += ["-i", "-"] # collect output args if output_format.is_pcm(): - output_args = ["-t", output_format.sox_format(), "-c", "2", "-"] - elif output_format == ContentType.FLAC: - output_args = ["-t", "flac", "-C", "0", "-"] + output_args = [ + "-acodec", + output_format.name.lower(), + "-f", + output_format.value, + "-ac", + str(pcm_channels), + "-ar", + str(pcm_sample_rate), + "-", + ] else: - output_args = ["-t", output_format.sox_format(), "-"] - # collect filter args - filter_args = [] + output_args = ["-f", output_format.value, "-"] + # collect extra and filter args + extra_args = [] + filter_params = [] if streamdetails.gain_correct: - filter_args += ["vol", str(streamdetails.gain_correct), "dB"] - if resample and streamdetails.media_type != MediaType.RADIO: - # use extra high quality resampler only if it makes sense - filter_args += ["rate", "-v", str(resample)] - elif resample: - filter_args += ["rate", str(resample)] - return input_args + output_args + filter_args + filter_params.append(f"volume={streamdetails.gain_correct}dB") + if ( + pcm_sample_rate is not None + and streamdetails.sample_rate != pcm_sample_rate + and libsoxr_support + and streamdetails.media_type == MediaType.TRACK + ): + # prefer libsoxr high quality resampler (if present) for sample rate conversions + filter_params.append("aresample=resampler=soxr") + if filter_params: + extra_args += ["-af", ",".join(filter_params)] + + if pcm_sample_rate is not None and not output_format.is_pcm(): + extra_args += ["-ar", str(pcm_sample_rate)] + + return input_args + extra_args + output_args async def get_media_stream( mass: MusicAssistant, streamdetails: StreamDetails, output_format: Optional[ContentType] = None, - resample: Optional[int] = None, + pcm_sample_rate: Optional[int] = None, chunk_size: Optional[int] = None, - seek_position: Optional[int] = None, -) -> AsyncGenerator[Tuple[bool, bytes], None]: + seek_position: int = 0, +) -> AsyncGenerator[bytes, None]: """Get the audio stream for the given streamdetails.""" - if chunk_size is None: - if streamdetails.content_type in ( - ContentType.AAC, - ContentType.M4A, - ContentType.MP3, - ContentType.OGG, - ): - chunk_size = 32000 - else: - chunk_size = 256000 - - mass.signal_event( - MassEvent( - EventType.STREAM_STARTED, - object_id=streamdetails.provider.value, - data=streamdetails, - ) - ) - args = await get_sox_args(streamdetails, output_format, resample, seek_position) - async with AsyncProcess(args) as sox_proc: + args = await get_ffmpeg_args(streamdetails, output_format, pcm_sample_rate) + async with AsyncProcess( + args, enable_write=True, chunk_size=chunk_size + ) as ffmpeg_proc: LOGGER.debug( - "start media stream for: %s/%s (%s)", - streamdetails.provider, - streamdetails.item_id, - streamdetails.type, + "start media stream for: %s, using args: %s", streamdetails.uri, str(args) ) + async def writer(): + """Task that grabs the source audio and feeds it to ffmpeg.""" + LOGGER.debug("writer started for %s", streamdetails.uri) + music_prov = mass.music.get_provider(streamdetails.provider) + async for audio_chunk in music_prov.get_audio_stream( + streamdetails, seek_position + ): + await ffmpeg_proc.write(audio_chunk) + # write eof when last packet is received + ffmpeg_proc.write_eof() + LOGGER.debug("writer finished for %s", streamdetails.uri) + + ffmpeg_proc.attach_task(writer()) + # yield chunks from stdout - # we keep 1 chunk behind to detect end of stream properly try: - prev_chunk = b"" - async for chunk in sox_proc.iterate_chunks(chunk_size): - if prev_chunk: - yield (False, prev_chunk) - prev_chunk = chunk - # send last chunk - yield (True, prev_chunk) + async for chunk in ffmpeg_proc.iterate_chunks(): + yield chunk except (asyncio.CancelledError, GeneratorExit) as err: - LOGGER.debug( - "media stream aborted for: %s/%s", - streamdetails.provider, - streamdetails.item_id, - ) + LOGGER.debug("media stream aborted for: %s", streamdetails.uri) raise err else: - LOGGER.debug( - "finished media stream for: %s/%s", - streamdetails.provider, - streamdetails.item_id, - ) + LOGGER.debug("finished media stream for: %s", streamdetails.uri) await mass.music.mark_item_played( streamdetails.item_id, streamdetails.provider ) + finally: # send analyze job to background worker - if ( - streamdetails.loudness is None - and streamdetails.provider != ProviderType.URL - ): - uri = f"{streamdetails.provider.value}://{streamdetails.media_type.value}/{streamdetails.item_id}" + if streamdetails.loudness is None: mass.add_job( - analyze_audio(mass, streamdetails), f"Analyze audio for {uri}" - ) - finally: - mass.signal_event( - MassEvent( - EventType.STREAM_ENDED, - object_id=streamdetails.provider.value, - data=streamdetails, + analyze_audio(mass, streamdetails), + f"Analyze audio for {streamdetails.uri}", ) + + +async def get_radio_stream( + mass: MusicAssistant, url: str, streamdetails: StreamDetails +) -> AsyncGenerator[bytes, None]: + """Get radio audio stream from HTTP, including metadata retrieval.""" + headers = {"Icy-MetaData": "1"} + while True: + # in loop to reconnect on connection failure + LOGGER.debug("radio stream (re)connecting to: %s", url) + async with mass.http_session.get(url, headers=headers) as resp: + headers = resp.headers + meta_int = int(headers.get("icy-metaint", "0")) + # stream with ICY Metadata + if meta_int: + while True: + audio_chunk = await resp.content.readexactly(meta_int) + yield audio_chunk + meta_byte = await resp.content.readexactly(1) + meta_length = ord(meta_byte) * 16 + meta_data = await resp.content.readexactly(meta_length) + if not meta_data: + continue + meta_data = meta_data.rstrip(b"\0") + stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data) + if not stream_title: + continue + stream_title = stream_title.group(1).decode() + if stream_title != streamdetails.stream_title: + streamdetails.stream_title = stream_title + if queue := mass.players.get_player_queue( + streamdetails.queue_id + ): + queue.signal_update() + # Regular HTTP stream + else: + async for chunk in resp.content.iter_any(): + yield chunk + + +async def get_http_stream( + mass: MusicAssistant, + url: str, + streamdetails: StreamDetails, + seek_position: int = 0, +) -> AsyncGenerator[bytes, None]: + """Get audio stream from HTTP.""" + if seek_position: + assert streamdetails.duration, "Duration required for seek requests" + chunk_size = get_chunksize(streamdetails.content_type) + # try to get filesize with a head request + if seek_position and not streamdetails.size: + async with mass.http_session.head(url) as resp: + if size := resp.headers.get("Content-Length"): + streamdetails.size = int(size) + # headers + headers = {} + skip_bytes = 0 + if seek_position and streamdetails.size: + skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position) + headers["Range"] = f"bytes={skip_bytes}-" + + # start the streaming from http + buffer = b"" + buffer_all = False + bytes_received = 0 + async with mass.http_session.get(url, headers=headers) as resp: + is_partial = resp.status == 206 + buffer_all = seek_position and not is_partial + async for chunk in resp.content.iter_chunked(chunk_size): + bytes_received += len(chunk) + if buffer_all and not skip_bytes: + buffer += chunk + continue + if not is_partial and skip_bytes and bytes_received < skip_bytes: + continue + yield chunk + + # store size on streamdetails for later use + if not streamdetails.size: + streamdetails.size = bytes_received + if buffer_all: + skip_bytes = streamdetails.size / streamdetails.duration * seek_position + yield buffer[:skip_bytes] + del buffer + + +async def get_file_stream( + mass: MusicAssistant, + filename: str, + streamdetails: StreamDetails, + seek_position: int = 0, +) -> AsyncGenerator[bytes, None]: + """Get audio stream from local accessible file.""" + if seek_position: + assert streamdetails.duration, "Duration required for seek requests" + if not streamdetails.size: + stat = await mass.loop.run_in_executor(None, os.stat, filename) + streamdetails.size = stat.st_size + chunk_size = get_chunksize(streamdetails.content_type) + async with aiofiles.open(streamdetails.data, "rb") as _file: + if seek_position: + seek_pos = int( + (streamdetails.size / streamdetails.duration) * seek_position ) + await _file.seek(seek_pos) + # yield chunks of data from file + while True: + data = await _file.read(chunk_size) + if not data: + break + yield data -async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool, bool]: - """Check if sox and/or ffmpeg are present.""" +async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool]: + """Check if ffmpeg is present (with/without libsoxr support).""" cache_key = "audio_support_cache" if cache := globals().get(cache_key): return cache - # check for SoX presence - returncode, output = await check_output("sox --version") - sox_present = returncode == 0 and "SoX" in output.decode() - if not sox_present and try_install: - # try a few common ways to install SoX - # this all assumes we have enough rights and running on a linux based platform (or docker) - await check_output("apt-get update && apt-get install sox libsox-fmt-all") - await check_output("apk add sox") - # test again - returncode, output = await check_output("sox --version") - sox_present = returncode == 0 and "SoX" in output.decode() # check for FFmpeg presence returncode, output = await check_output("ffmpeg -version") ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode() if not ffmpeg_present and try_install: - # try a few common ways to install SoX + # try a few common ways to install ffmpeg # this all assumes we have enough rights and running on a linux based platform (or docker) await check_output("apt-get update && apt-get install ffmpeg") await check_output("apk add ffmpeg") @@ -573,67 +582,35 @@ async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool, bo ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode() # use globals as in-memory cache - result = (sox_present, ffmpeg_present) + libsoxr_support = "enable-libsoxr" in output.decode() + result = (ffmpeg_present, libsoxr_support) globals()[cache_key] = result return result -async def get_sox_args_for_pcm_stream( +async def get_ffmpeg_args_for_pcm_stream( sample_rate: int, bit_depth: int, channels: int, floating_point: bool = False, output_format: ContentType = ContentType.FLAC, ) -> List[str]: - """Collect args for sox (or ffmpeg) when converting from raw pcm to another contenttype.""" - - sox_present, ffmpeg_present = await check_audio_support() + """Collect args for ffmpeg when converting from raw pcm to another contenttype.""" input_format = ContentType.from_bit_depth(bit_depth, floating_point) - sox_present = True - - # use ffmpeg if sox is not present - if not sox_present: - if not ffmpeg_present: - raise AudioError( - "FFmpeg binary is missing from system. " - "Please install ffmpeg on your OS to enable playback.", - ) - # collect input args - input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] - input_args += [ - "-f", - input_format.value, - "-ac", - str(channels), - "-ar", - str(sample_rate), - "-i", - "-", - ] - # collect output args - output_args = ["-f", output_format.value, "-"] - return input_args + output_args - - # Prefer SoX for all other (=highest quality) - # collect input args - input_args = [ - "sox", - "-t", - input_format.sox_format(), - "-r", - str(sample_rate), - "-b", - str(bit_depth), - "-c", + input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-ignore_unknown"] + input_args += [ + "-f", + input_format.value, + "-ac", str(channels), + "-ar", + str(sample_rate), + "-i", "-", ] - # collect output args - if output_format == ContentType.FLAC: - output_args = ["-t", "flac", "-C", "0", "-"] - else: - output_args = ["-t", output_format.sox_format(), "-"] + # collect output args + output_args = ["-f", output_format.value, "-"] return input_args + output_args @@ -641,53 +618,53 @@ async def get_preview_stream( mass: MusicAssistant, provider_id: str, track_id: str, -) -> AsyncGenerator[Tuple[bool, bytes], None]: - """Get the audio stream for the given streamdetails.""" +) -> AsyncGenerator[bytes, None]: + """Create a 30 seconds preview audioclip for the given streamdetails.""" music_prov = mass.music.get_provider(provider_id) streamdetails = await music_prov.get_stream_details(track_id) - if streamdetails.type == StreamType.EXECUTABLE: - # stream from executable - input_args = [ - streamdetails.path, - "|", - "ffmpeg", - "-hide_banner", - "-loglevel", - "error", - "-f", - streamdetails.content_type.value, - "-i", - "-", - ] - else: - input_args = [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "error", - "-i", - streamdetails.path, - ] - output_args = ["-ss", "30", "-to", "60", "-f", "mp3", "-"] - async with AsyncProcess(input_args + output_args) as proc: + input_args = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-f", + streamdetails.content_type.value, + "-i", + "-", + ] + output_args = ["-to", "30", "-f", "mp3", "-"] + args = input_args + output_args + async with AsyncProcess(args, True) as ffmpeg_proc: + + async def writer(): + """Task that grabs the source audio and feeds it to ffmpeg.""" + music_prov = mass.music.get_provider(streamdetails.provider) + async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30): + await ffmpeg_proc.write(audio_chunk) + # write eof when last packet is received + ffmpeg_proc.write_eof() + + ffmpeg_proc.attach_task(writer()) # yield chunks from stdout - # we keep 1 chunk behind to detect end of stream properly - try: - prev_chunk = b"" - async for chunk in proc.iterate_chunks(): - if prev_chunk: - yield (False, prev_chunk) - prev_chunk = chunk - # send last chunk - yield (True, prev_chunk) - finally: - mass.signal_event( - MassEvent( - EventType.STREAM_ENDED, - object_id=streamdetails.provider.value, - data=streamdetails, - ) - ) + async for chunk in ffmpeg_proc.iterate_chunks(): + yield chunk + + +def get_chunksize(content_type: ContentType) -> int: + """Get a default chunksize for given contenttype.""" + if content_type.is_pcm(): + return 512000 + if content_type in ( + ContentType.AAC, + ContentType.M4A, + ): + return 32000 + if content_type in ( + ContentType.MP3, + ContentType.OGG, + ): + return 64000 + return 256000 diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 958e64b1..95263786 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -12,22 +12,30 @@ from typing import AsyncGenerator, Coroutine, List, Optional, Tuple, Union from async_timeout import timeout as _timeout -LOGGER = logging.getLogger("AsyncProcess") +LOGGER = logging.getLogger(__name__) -DEFAULT_CHUNKSIZE = 512000 +DEFAULT_CHUNKSIZE = 128000 DEFAULT_TIMEOUT = 120 class AsyncProcess: """Implementation of a (truly) non blocking subprocess.""" - def __init__(self, args: Union[List, str], enable_write: bool = False): + def __init__( + self, + args: Union[List, str], + enable_write: bool = False, + chunk_size: int = DEFAULT_CHUNKSIZE, + use_stderr: bool = False, + ): """Initialize.""" self._proc = None self._args = args + self._use_stderr = use_stderr self._enable_write = enable_write self._attached_task: asyncio.Task = None self.closed = False + self.chunk_size = chunk_size or DEFAULT_CHUNKSIZE async def __aenter__(self) -> "AsyncProcess": """Enter context manager.""" @@ -40,16 +48,18 @@ class AsyncProcess: self._proc = await asyncio.create_subprocess_shell( args, stdin=asyncio.subprocess.PIPE if self._enable_write else None, - stdout=asyncio.subprocess.PIPE, - limit=DEFAULT_CHUNKSIZE * 10, + stdout=asyncio.subprocess.PIPE if not self._use_stderr else None, + stderr=asyncio.subprocess.PIPE if self._use_stderr else None, + limit=self.chunk_size * 5, close_fds=True, ) else: self._proc = await asyncio.create_subprocess_exec( *args, stdin=asyncio.subprocess.PIPE if self._enable_write else None, - stdout=asyncio.subprocess.PIPE, - limit=DEFAULT_CHUNKSIZE * 10, + stdout=asyncio.subprocess.PIPE if not self._use_stderr else None, + stderr=asyncio.subprocess.PIPE if self._use_stderr else None, + limit=self.chunk_size * 5, close_fds=True, ) return self @@ -59,53 +69,37 @@ class AsyncProcess: self.closed = True if self._attached_task: # cancel the attached reader/writer task - self._attached_task.cancel() - if self._proc.returncode is None: - # prevent subprocess deadlocking, send terminate and read remaining bytes try: - self._proc.terminate() - # close stdin and let it drain - if self._enable_write: - await self._proc.stdin.drain() - self._proc.stdin.close() - # read remaining bytes - await self._proc.stdout.read() - # we really want to make this thing die ;-) - self._proc.kill() - except ( - ProcessLookupError, - BrokenPipeError, - RuntimeError, - ConnectionResetError, - ): + self._attached_task.cancel() + await self._attached_task + except asyncio.CancelledError: pass - del self._proc + if self._proc.returncode is None: + # prevent subprocess deadlocking, read remaining bytes + await self._proc.communicate(b"" if self._enable_write else None) + if self._proc.returncode is None: + # just in case? + self._proc.kill() - async def iterate_chunks( - self, chunk_size: int = DEFAULT_CHUNKSIZE, timeout: int = DEFAULT_TIMEOUT - ) -> AsyncGenerator[bytes, None]: + async def iterate_chunks(self) -> AsyncGenerator[bytes, None]: """Yield chunks from the process stdout. Generator.""" while True: - chunk = await self.read(chunk_size, timeout) - if not chunk: - break + chunk = await self._read_chunk() yield chunk - if chunk_size is not None and len(chunk) < chunk_size: + if len(chunk) < self.chunk_size: + del chunk break + del chunk - async def read( - self, chunk_size: int = DEFAULT_CHUNKSIZE, timeout: int = DEFAULT_TIMEOUT - ) -> bytes: - """Read x bytes from the process stdout.""" + async def _read_chunk(self, timeout: int = DEFAULT_TIMEOUT) -> bytes: + """Read chunk_size bytes from the process stdout.""" + if self.closed: + return b"" try: async with _timeout(timeout): - if chunk_size is None: - return await self._proc.stdout.read(DEFAULT_CHUNKSIZE) - return await self._proc.stdout.readexactly(chunk_size) + return await self._proc.stdout.readexactly(self.chunk_size) except asyncio.IncompleteReadError as err: return err.partial - except AttributeError as exc: - raise asyncio.CancelledError() from exc except asyncio.TimeoutError: return b"" @@ -116,26 +110,31 @@ class AsyncProcess: try: self._proc.stdin.write(data) await self._proc.stdin.drain() - except (AttributeError, AssertionError, BrokenPipeError): + except ( + AttributeError, + AssertionError, + BrokenPipeError, + RuntimeError, + ConnectionResetError, + ) as err: # already exited, race condition - pass + raise asyncio.CancelledError() from err def write_eof(self) -> None: """Write end of file to to process stdin.""" - try: - if self._proc.stdin.can_write_eof(): - self._proc.stdin.write_eof() - except (AttributeError, AssertionError, BrokenPipeError): - # already exited, race condition - pass + if self.closed: + return + if self._proc.stdin.can_write_eof(): + self._proc.stdin.write_eof() async def communicate(self, input_data: Optional[bytes] = None) -> bytes: """Write bytes to process and read back results.""" return await self._proc.communicate(input_data) - def attach_task(self, coro: Coroutine) -> None: + def attach_task(self, coro: Coroutine) -> asyncio.Task: """Attach given coro func as reader/writer task to properly cancel it when needed.""" - self._attached_task = asyncio.create_task(coro) + self._attached_task = task = asyncio.create_task(coro) + return task async def check_output(shell_cmd: str) -> Tuple[int, bytes]: diff --git a/music_assistant/mass.py b/music_assistant/mass.py index e889f92f..d61aeeee 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -14,10 +14,11 @@ from uuid import uuid4 import aiohttp +from music_assistant.constants import ROOT_LOGGER_NAME from music_assistant.controllers.metadata import MetaDataController from music_assistant.controllers.music import MusicController from music_assistant.controllers.players import PlayerController -from music_assistant.controllers.stream import StreamController +from music_assistant.controllers.streams import StreamsController from music_assistant.helpers.cache import Cache from music_assistant.helpers.database import Database from music_assistant.models.background_job import BackgroundJob @@ -51,7 +52,7 @@ class MusicAssistant: self.loop: asyncio.AbstractEventLoop = None self.http_session: aiohttp.ClientSession = session self.http_session_provided = session is not None - self.logger = logging.getLogger(__name__) + self.logger = logging.getLogger(ROOT_LOGGER_NAME) self._listeners = [] self._jobs: Deque[BackgroundJob] = deque() @@ -63,7 +64,7 @@ class MusicAssistant: self.metadata = MetaDataController(self) self.music = MusicController(self) self.players = PlayerController(self) - self.streams = StreamController(self) + self.streams = StreamsController(self) self._tracked_tasks: List[asyncio.Task] = [] self.closed = False diff --git a/music_assistant/models/config.py b/music_assistant/models/config.py index 4130970e..f08b3d40 100644 --- a/music_assistant/models/config.py +++ b/music_assistant/models/config.py @@ -44,6 +44,6 @@ class MassConfig: providers: List[MusicProviderConfig] = field(default_factory=list) # advanced settings - max_simultaneous_jobs: int = 5 + max_simultaneous_jobs: int = 2 stream_port: int = select_stream_port() stream_ip: str = get_ip() diff --git a/music_assistant/models/enums.py b/music_assistant/models/enums.py index b1495e1d..b2be4df5 100644 --- a/music_assistant/models/enums.py +++ b/music_assistant/models/enums.py @@ -11,6 +11,7 @@ class MediaType(Enum): TRACK = "track" PLAYLIST = "playlist" RADIO = "radio" + URL = "url" UNKNOWN = "unknown" @@ -68,15 +69,6 @@ class AlbumType(Enum): UNKNOWN = "unknown" -class StreamType(Enum): - """Enum with stream types.""" - - EXECUTABLE = "executable" - URL = "url" - FILE = "file" - CACHE = "cache" - - class ContentType(Enum): """Enum with audio content/container types supported by ffmpeg.""" @@ -114,22 +106,6 @@ class ContentType(Enum): """Return if contentype is PCM.""" return self.name.startswith("PCM") - def sox_supported(self): - """Return if ContentType is supported by SoX.""" - return self.is_pcm() or self in [ - ContentType.OGG, - ContentType.FLAC, - ContentType.MP3, - ContentType.WAV, - ContentType.AIFF, - ] - - def sox_format(self): - """Convert the ContentType to SoX compatible format.""" - if not self.sox_supported(): - raise NotImplementedError - return self.value.replace("le", "") - @classmethod def from_bit_depth( cls, bit_depth: int, floating_point: bool = False @@ -186,8 +162,6 @@ class EventType(Enum): PLAYER_ADDED = "player_added" PLAYER_UPDATED = "player_updated" - STREAM_STARTED = "streaming_started" - STREAM_ENDED = "streaming_ended" QUEUE_ADDED = "queue_added" QUEUE_UPDATED = "queue_updated" QUEUE_ITEMS_UPDATED = "queue_items_updated" diff --git a/music_assistant/models/media_items.py b/music_assistant/models/media_items.py index bdc3b5db..2ae836c0 100755 --- a/music_assistant/models/media_items.py +++ b/music_assistant/models/media_items.py @@ -2,6 +2,7 @@ from __future__ import annotations from dataclasses import dataclass, field, fields +from time import time from typing import Any, Dict, List, Mapping, Optional, Set, Union from mashumaro import DataClassDictMixin @@ -17,7 +18,6 @@ from music_assistant.models.enums import ( MediaQuality, MediaType, ProviderType, - StreamType, ) MetadataTypes = Union[int, bool, str, List[str]] @@ -338,30 +338,50 @@ MediaItemType = Union[Artist, Album, Track, Radio, Playlist] class StreamDetails(DataClassDictMixin): """Model for streamdetails.""" - type: StreamType + # NOTE: the actual provider/itemid of the streamdetails may differ + # from the connected media_item due to track linking etc. + # the streamdetails are only used to provide details about the content + # that is going to be streamed. + + # mandatory fields provider: ProviderType item_id: str - path: str content_type: ContentType - player_id: str = "" - details: Dict[str, Any] = field(default_factory=dict) - seconds_played: int = 0 - seconds_skipped: int = 0 - gain_correct: float = 0 - loudness: Optional[float] = None + media_type: MediaType = MediaType.TRACK sample_rate: int = 44100 bit_depth: int = 16 channels: int = 2 - media_type: MediaType = MediaType.TRACK - queue_id: str = None + # stream_title: radio streams can optionally set this field + stream_title: Optional[str] = None + # duration of the item to stream, copied from media_item if omitted + duration: Optional[int] = None + # total size in bytes of the item, calculated at eof when omitted + size: Optional[int] = None + # expires: timestamp this streamdetails expire + expires: float = time() + 3600 + # data: provider specific data (not exposed externally) + data: Optional[Any] = None + + # the fields below will be set/controlled by the streamcontroller + queue_id: Optional[str] = None + seconds_streamed: int = 0 + seconds_skipped: int = 0 + gain_correct: Optional[float] = None + loudness: Optional[float] = None def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]: """Exclude internal fields from dict.""" # pylint: disable=no-self-use - d.pop("path") - d.pop("details") + d.pop("data") + d.pop("expires") + d.pop("queue_id") return d def __str__(self): """Return pretty printable string of object.""" - return f"{self.type.value}/{self.content_type.value} - {self.provider.value}/{self.item_id}" + return self.uri + + @property + def uri(self) -> str: + """Return uri representation of item.""" + return f"{self.provider.value}://{self.media_type.value}/{self.item_id}" diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 3af07abc..e92160d6 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -195,6 +195,22 @@ class Player(ABC): # SOME CONVENIENCE METHODS (may be overridden if needed) + @property + def stream_type(self) -> ContentType: + """Return supported/preferred stream type for playerqueue. Read only.""" + # determine default stream type from player capabilities + return next( + x + for x in ( + ContentType.FLAC, + ContentType.WAV, + ContentType.PCM_S16LE, + ContentType.MP3, + ContentType.MPEG, + ) + if x in self.supported_content_types + ) + async def volume_mute(self, muted: bool) -> None: """Send volume mute command to player.""" # for players that do not support mute, we fake mute with volume @@ -402,6 +418,20 @@ class PlayerGroup(Player): ) ) + @property + def supported_sample_rates(self) -> Tuple[int]: + """Return the sample rates this player supports.""" + return tuple( + sample_rate + for sample_rate in DEFAULT_SUPPORTED_SAMPLE_RATES + if all( + ( + sample_rate in child_player.supported_sample_rates + for child_player in self._get_child_players(False, False) + ) + ) + ) + async def stop(self) -> None: """Send STOP command to player.""" if not self.use_multi_stream: diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 90f8d6b4..9f44e22a 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -2,27 +2,23 @@ from __future__ import annotations import asyncio +import os import pathlib import random from asyncio import Task, TimerHandle from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union -from music_assistant.helpers.audio import get_stream_details from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode -from music_assistant.models.errors import ( - MediaNotFoundError, - MusicAssistantError, - QueueEmpty, -) +from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError from music_assistant.models.event import MassEvent -from music_assistant.models.media_items import StreamDetails from .player import Player, PlayerGroup, PlayerState from .queue_item import QueueItem from .queue_settings import QueueSettings if TYPE_CHECKING: + from music_assistant.controllers.streams import QueueStream from music_assistant.mass import MusicAssistant RESOURCES_DIR = ( @@ -41,7 +37,6 @@ class QueueSnapShot: powered: bool state: PlayerState - volume_level: int items: List[QueueItem] index: Optional[int] position: int @@ -55,35 +50,24 @@ class PlayerQueue: self.mass = mass self.logger = mass.players.logger self.queue_id = player_id + self.signal_next: bool = False + self._stream_id: str = "" self._settings = QueueSettings(self) self._current_index: Optional[int] = None - # index_in_buffer: which track is currently (pre)loaded in the streamer - self._index_in_buffer: Optional[int] = None self._current_item_elapsed_time: int = 0 self._prev_item: Optional[QueueItem] = None - # start_index: from which index did the queuestream start playing - self._start_index: int = 0 - # start_pos: from which position (in seconds) did the first track start playing? - self._start_pos: int = 0 - self._next_fadein: int = 0 - self._next_start_index: int = 0 - self._next_start_pos: int = 0 - self._last_state = PlayerState.IDLE + self._last_state = str self._items: List[QueueItem] = [] self._save_task: TimerHandle = None self._update_task: Task = None - self._signal_next: bool = False self._last_player_update: int = 0 - self._stream_url: str = "" + self._last_stream_id: str = "" self._snapshot: Optional[QueueSnapShot] = None async def setup(self) -> None: """Handle async setup of instance.""" await self._settings.restore() await self._restore_items() - self._stream_url: str = self.mass.streams.get_stream_url( - self.queue_id, content_type=self._settings.stream_type - ) self.mass.signal_event( MassEvent(EventType.QUEUE_ADDED, object_id=self.queue_id, data=self) ) @@ -103,12 +87,28 @@ class PlayerQueue: """Return if player(queue) is available.""" return self.player.available + @property + def stream(self) -> QueueStream | None: + """Return the currently connected/active stream for this queue.""" + return self.mass.streams.queue_streams.get(self._stream_id) + + @property + def index_in_buffer(self) -> int | None: + """Return the item that is curently loaded into the buffer.""" + if not self._stream_id: + return None + if stream := self.mass.streams.queue_streams.get(self._stream_id): + return stream.index_in_buffer + return None + @property def active(self) -> bool: """Return bool if the queue is currenty active on the player.""" - if self.player.use_multi_stream: - return self.queue_id in self.player.current_url - return self._stream_url == self.player.current_url + if not self.player.current_url: + return False + if stream := self.stream: + return stream.url == self.player.current_url + return False @property def elapsed_time(self) -> float: @@ -192,8 +192,7 @@ class PlayerQueue: QueueOption.REPLACE -> Replace queue contents with these items QueueOption.NEXT -> Play item(s) after current playing item QueueOption.ADD -> Append new items at end of the queue - :param passive: do not actually start playback. - Returns: the stream URL for this queue. + :param passive: if passive set to true the stream url will not be sent to the player. """ # a single item or list of items may be provided if not isinstance(uris, list): @@ -205,8 +204,8 @@ class PlayerQueue: media_item = await self.mass.music.get_item_by_uri(uri) except MusicAssistantError as err: # invalid MA uri or item not found error - if uri.startswith("http"): - # a plain url was provided + if uri.startswith("http") or os.path.isfile(uri): + # a plain url (or local file) was provided queue_items.append(QueueItem.from_url(uri)) continue raise MediaNotFoundError(f"Invalid uri: {uri}") from err @@ -247,23 +246,20 @@ class PlayerQueue: if self._current_index and self._current_index >= (len(self._items) - 1): self._current_index = None self._items = [] - # clear resume point if any - self._start_pos = 0 # load items into the queue if queue_opt == QueueOption.REPLACE: - await self.load(queue_items, passive=passive) + await self.load(queue_items, passive) elif ( queue_opt in [QueueOption.PLAY, QueueOption.NEXT] and len(queue_items) > 100 ): - await self.load(queue_items, passive=passive) + await self.load(queue_items, passive) elif queue_opt == QueueOption.NEXT: - await self.insert(queue_items, 1, passive=passive) + await self.insert(queue_items, 1, passive) elif queue_opt == QueueOption.PLAY: - await self.insert(queue_items, 0, passive=passive) + await self.insert(queue_items, 0, passive) elif queue_opt == QueueOption.ADD: await self.append(queue_items) - return self._stream_url async def play_alert( self, uri: str, announce: bool = False, volume_adjust: int = 10 @@ -277,7 +273,7 @@ class PlayerQueue: """ if self._snapshot: self.logger.debug("Ignore play_alert: already in progress") - return + # return # create snapshot await self.snapshot_create() @@ -286,7 +282,9 @@ class PlayerQueue: # prepend annnounce sound if needed if announce: - queue_items.append(QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert")) + queue_item = QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert") + queue_item.streamdetails.gain_correct = 10 + queue_items.append(queue_item) # parse provided uri into a MA MediaItem or Basic QueueItem from URL try: @@ -294,23 +292,21 @@ class PlayerQueue: queue_items.append(QueueItem.from_media_item(media_item)) except MusicAssistantError as err: # invalid MA uri or item not found error - if uri.startswith("http"): + if uri.startswith("http") or os.path.isfile(uri): # a plain url was provided - queue_items.append(QueueItem.from_url(uri, "alert")) + queue_item = QueueItem.from_url(uri, "alert") + queue_item.streamdetails.gain_correct = 6 + queue_items.append(queue_item) else: raise MediaNotFoundError(f"Invalid uri: {uri}") from err # append silence track, we use this to reliably detect when the alert is ready silence_url = self.mass.streams.get_silence_url(600) - queue_items.append(QueueItem.from_url(silence_url, "alert")) + queue_item = QueueItem.from_url(silence_url, "alert") + queue_items.append(queue_item) # load queue with alert sound(s) await self.load(queue_items) - # set new volume - new_volume = self.player.volume_level + ( - self.player.volume_level / 100 * volume_adjust - ) - await self.player.volume_set(new_volume) # wait for the alert to finish playing alert_done = asyncio.Event() @@ -336,6 +332,7 @@ class PlayerQueue: async def stop(self) -> None: """Stop command on queue player.""" + self.signal_next = False # redirect to underlying player await self.player.stop() @@ -399,7 +396,9 @@ class PlayerQueue: resume_pos = 0 if resume_item is not None: - await self.play_index(resume_item.item_id, resume_pos, 5) + resume_pos = resume_pos if resume_pos > 10 else 0 + fade_in = 5 if resume_pos else 0 + await self.play_index(resume_item.item_id, resume_pos, fade_in) else: self.logger.warning( "resume queue requested for %s but queue is empty", self.queue_id @@ -410,7 +409,6 @@ class PlayerQueue: self._snapshot = QueueSnapShot( powered=self.player.powered, state=self.player.state, - volume_level=self.player.volume_level, items=self._items, index=self._current_index, position=self._current_item_elapsed_time, @@ -421,11 +419,10 @@ class PlayerQueue: assert self._snapshot, "Create snapshot before restoring it." # clear queue first await self.clear() - # restore volume - await self.player.volume_set(self._snapshot.volume_level) # restore queue await self.update(self._snapshot.items) self._current_index = self._snapshot.index + self._current_item_elapsed_time = self._snapshot.position if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED): await self.resume() if self._snapshot.state == PlayerState.PAUSED: @@ -443,8 +440,6 @@ class PlayerQueue: passive: bool = False, ) -> None: """Play item at index (or item_id) X in queue.""" - if self.player.use_multi_stream: - await self.mass.streams.stop_multi_client_queue_stream(self.queue_id) if not isinstance(index, int): index = self.index_by_id(index) if index is None: @@ -452,38 +447,8 @@ class PlayerQueue: if not len(self.items) > index: return self._current_index = index - self._next_start_index = index - self._next_start_pos = int(seek_position) - self._next_fadein = fade_in - # send stream url to player connected to this queue - self._stream_url = self.mass.streams.get_stream_url( - self.queue_id, content_type=self._settings.stream_type - ) - - if self.player.use_multi_stream: - # multi stream enabled, all child players should receive the same audio stream - # redirect command to all (powered) players - coros = [] - expected_clients = set() - for child_id in self.player.group_childs: - if child_player := self.mass.players.get_player(child_id): - if child_player.powered: - # TODO: this assumes that all client players support flac - player_url = self.mass.streams.get_stream_url( - self.queue_id, child_id, self._settings.stream_type - ) - expected_clients.add(child_id) - coros.append(child_player.play_url(player_url)) - await self.mass.streams.start_multi_client_queue_stream( - # TODO: this assumes that all client players support flac - self.queue_id, - expected_clients, - self._settings.stream_type, - ) - await asyncio.gather(*coros) - elif not passive: - # regular (single player) request - await self.player.play_url(self._stream_url) + # start the queue stream + await self.queue_stream_start(index, int(seek_position), fade_in, passive) async def move_item(self, queue_item_id: str, pos_shift: int = 1) -> None: """ @@ -510,9 +475,10 @@ class PlayerQueue: async def delete_item(self, queue_item_id: str) -> None: """Delete item (by id or index) from the queue.""" item_index = self.index_by_id(queue_item_id) - if item_index <= self._index_in_buffer: + if self.stream and item_index <= self.index_in_buffer: # ignore request if track already loaded in the buffer # the frontend should guard so this is just in case + self.logger.warning("delete requested for item already loaded in buffer") return self._items.pop(item_index) self.signal_update(True) @@ -538,9 +504,10 @@ class PlayerQueue: :param queue_items: a list of QueueItem :param offset: offset from current queue position """ - if not self.items or self._current_index is None: - return await self.load(queue_items) - insert_at_index = self._current_index + offset + cur_index = self.index_in_buffer or self._current_index + if not self.items or cur_index is None: + return await self.load(queue_items, passive) + insert_at_index = cur_index + offset for index, item in enumerate(queue_items): item.sort_index = insert_at_index + index if self.settings.shuffle_enabled and len(queue_items) > 5: @@ -559,7 +526,7 @@ class PlayerQueue: + self._items[insert_at_index:] ) - if offset in (0, self._index_in_buffer): + if offset in (0, cur_index): await self.play_index(insert_at_index, passive=passive) self.signal_update(True) @@ -592,9 +559,10 @@ class PlayerQueue: def on_player_update(self) -> None: """Call when player updates.""" - if self._last_state != self.player.state: + player_state_str = f"{self.player.state.value}.{self.player.current_url}" + if self._last_state != player_state_str: # playback state changed - self._last_state = self.player.state + self._last_state = player_state_str # always signal update if playback state changed self.signal_update() @@ -606,13 +574,10 @@ class PlayerQueue: ): self._current_index += 1 self._current_item_elapsed_time = 0 - # repeat enabled (of whole queue), play queue from beginning - if self.settings.repeat_mode == RepeatMode.ALL: - self.mass.create_task(self.play_index(0)) # handle case where stream stopped on purpose and we need to restart it - elif self._signal_next: - self._signal_next = False + elif self.signal_next: + self.signal_next = False self.mass.create_task(self.resume()) # start poll/updater task if playback starts on player @@ -668,55 +633,58 @@ class PlayerQueue: ) ) - async def queue_stream_prepare(self) -> StreamDetails: - """Call when queue_streamer is about to start playing.""" - start_from_index = self._next_start_index - try: - next_item = self._items[start_from_index] - except (IndexError, TypeError) as err: - raise QueueEmpty() from err - try: - return await get_stream_details(self.mass, next_item, self.queue_id) - except MediaNotFoundError as err: - # something bad happened, try to recover by requesting the next track in the queue - await self.play_index(self._current_index + 2) - raise err - - async def queue_stream_start(self) -> Tuple[int, int, int]: - """Call when queue_streamer starts playing the queue stream.""" - start_from_index = self._next_start_index + async def queue_stream_start( + self, start_index: int, seek_position: int, fade_in: bool, passive: bool = False + ) -> None: + """Start the queue stream runner.""" + players: List[Player] = [] + output_format = self._settings.stream_type + # if multi stream is enabled, all child players should receive the same audio stream + if self.player.use_multi_stream: + for child_id in self.player.group_childs: + child_player = self.mass.players.get_player(child_id) + if not child_player or not child_player.powered: + continue + players.append(child_player) + else: + # regular (single player) request + players.append(self.player) + self._current_item_elapsed_time = 0 - self._current_index = start_from_index - self._start_index = start_from_index - self._next_start_index = self.get_next_index(start_from_index) - self._index_in_buffer = start_from_index - seek_position = self._next_start_pos - self._next_start_pos = 0 - fade_in = self._next_fadein - self._next_fadein = 0 - return (start_from_index, seek_position, fade_in) - - async def queue_stream_next(self, cur_index: int) -> int | None: - """Call when queue_streamer loads next track in buffer.""" - next_idx = self._next_start_index - self._index_in_buffer = next_idx - self._next_start_index = self.get_next_index(self._next_start_index) - return next_idx - - def get_next_index(self, cur_index: int) -> int: + self._current_index = start_index + + # start the queue stream background task + stream = await self.mass.streams.start_queue_stream( + queue=self, + expected_clients=len(players), + start_index=start_index, + seek_position=seek_position, + fade_in=fade_in, + output_format=output_format, + ) + self._stream_id = stream.stream_id + # execute the play command on the player(s) + if not passive: + await asyncio.gather(*[x.play_url(stream.url) for x in players]) + + def get_next_index(self, cur_index: Optional[int]) -> int: """Return the next index for the queue, accounting for repeat settings.""" - if not self._items or cur_index is None: - raise IndexError("No (more) items in queue") + # handle repeat single track if self.settings.repeat_mode == RepeatMode.ONE: return cur_index + # handle repeat all + if ( + self.settings.repeat_mode == RepeatMode.ALL + and self._items + and cur_index == (len(self._items) - 1) + ): + return 0 # simply return the next index. other logic is guarded to detect the index # being higher than the number of items to detect end of queue and/or handle repeat. + if cur_index is None: + return 0 return cur_index + 1 - async def queue_stream_signal_next(self): - """Indicate that queue stream needs to start next index once playback finished.""" - self._signal_next = True - def signal_update(self, items_changed: bool = False) -> None: """Signal state changed of this queue.""" if items_changed: @@ -744,6 +712,7 @@ class PlayerQueue: "state": self.player.state.value, "available": self.player.available, "current_index": self.current_index, + "index_in_buffer": self.index_in_buffer, "current_item": cur_item, "next_item": next_item, "settings": self.settings.to_dict(), @@ -756,9 +725,9 @@ class PlayerQueue: elapsed_time_queue = self.player.elapsed_time total_time = 0 track_time = 0 - if self._items and len(self._items) > self._start_index: - # start_index: holds the last starting position - queue_index = self._start_index + if self._items and self.stream and len(self._items) > self.stream.start_index: + # start_index: holds the position from which the stream started + queue_index = self.stream.start_index queue_track = None while len(self._items) > queue_index: # keep enumerating the queue tracks to find current track @@ -767,11 +736,15 @@ class PlayerQueue: if not queue_track.streamdetails: track_time = elapsed_time_queue - total_time break - track_seconds = queue_track.streamdetails.seconds_played - if elapsed_time_queue > (track_seconds + total_time): + duration = ( + queue_track.streamdetails.seconds_streamed or queue_track.duration + ) + if duration is not None and elapsed_time_queue > ( + duration + total_time + ): # total elapsed time is more than (streamed) track duration # move index one up - total_time += track_seconds + total_time += duration queue_index += 1 else: # no more seconds left to divide, this is our track diff --git a/music_assistant/models/provider.py b/music_assistant/models/provider.py index 62ba1f4e..3c6038fd 100644 --- a/music_assistant/models/provider.py +++ b/music_assistant/models/provider.py @@ -12,9 +12,9 @@ from music_assistant.models.media_items import ( MediaItemType, Playlist, Radio, + StreamDetails, Track, ) -from music_assistant.models.player_queue import StreamDetails if TYPE_CHECKING: from music_assistant.mass import MusicAssistant @@ -170,10 +170,16 @@ class MusicProvider: if MediaType.PLAYLIST in self.supported_mediatypes: raise NotImplementedError - async def get_stream_details(self, item_id: str) -> StreamDetails: + async def get_stream_details(self, item_id: str) -> StreamDetails | None: """Get streamdetails for a track/radio.""" raise NotImplementedError + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + raise NotImplementedError + async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType: """Get single MediaItem from provider.""" if media_type == MediaType.ARTIST: diff --git a/music_assistant/models/queue_item.py b/music_assistant/models/queue_item.py index 6dc2045e..878270ed 100644 --- a/music_assistant/models/queue_item.py +++ b/music_assistant/models/queue_item.py @@ -7,7 +7,7 @@ from uuid import uuid4 from mashumaro import DataClassDictMixin -from music_assistant.models.enums import MediaType +from music_assistant.models.enums import ContentType, MediaType, ProviderType from music_assistant.models.media_items import Radio, StreamDetails, Track @@ -46,14 +46,24 @@ class QueueItem(DataClassDictMixin): return d @classmethod - def from_url(cls, url: str, name: Optional[str] = None) -> QueueItem: - """Create QueueItem from plain url.""" + def from_url( + cls, + url: str, + name: Optional[str] = None, + media_type: MediaType = MediaType.URL, + ) -> QueueItem: + """Create QueueItem from plain url (or local file).""" return cls( uri=url, name=name or url.split("?")[0], - media_type=MediaType.UNKNOWN, - image=None, - media_item=None, + media_type=media_type, + streamdetails=StreamDetails( + provider=ProviderType.URL, + item_id=url, + content_type=ContentType.try_parse(url), + media_type=media_type, + data=url, + ), ) @classmethod diff --git a/music_assistant/models/queue_settings.py b/music_assistant/models/queue_settings.py index e69b5220..62cac2aa 100644 --- a/music_assistant/models/queue_settings.py +++ b/music_assistant/models/queue_settings.py @@ -23,7 +23,7 @@ class QueueSettings: self._crossfade_mode: CrossFadeMode = CrossFadeMode.DISABLED self._crossfade_duration: int = 6 self._volume_normalization_enabled: bool = True - self._volume_normalization_target: int = -23 + self._volume_normalization_target: int = -14 @property def repeat_mode(self) -> RepeatMode: @@ -124,18 +124,7 @@ class QueueSettings: @property def stream_type(self) -> ContentType: """Return supported/preferred stream type for playerqueue. Read only.""" - # determine default stream type from player capabilities - return next( - x - for x in ( - ContentType.FLAC, - ContentType.WAV, - ContentType.PCM_S16LE, - ContentType.MP3, - ContentType.MPEG, - ) - if x in self._queue.player.supported_content_types - ) + return self._queue.player.stream_type def to_dict(self) -> Dict[str, Any]: """Return dict from settings.""" -- 2.34.1