From: Marcel van der Veldt Date: Sat, 23 Nov 2024 01:13:40 +0000 (+0100) Subject: Feat: Add base support for audio sources X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=0f24bd2a58e0242e8415d125adf733254b944ad1;p=music-assistant-server.git Feat: Add base support for audio sources --- diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index 669077a5..f230fe62 100644 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -46,6 +46,7 @@ from music_assistant.constants import ( from music_assistant.helpers.api import api_command from music_assistant.helpers.tags import parse_tags from music_assistant.helpers.throttle_retry import Throttler +from music_assistant.helpers.uri import parse_uri from music_assistant.helpers.util import TaskManager, get_changed_values, lock from music_assistant.models.core_controller import CoreController from music_assistant.models.player_provider import PlayerProvider @@ -619,6 +620,32 @@ class PlayerController(CoreController): async with self._player_throttlers[player_id]: await player_prov.enqueue_next_media(player_id=player_id, media=media) + async def select_source(self, player_id: str, source: str) -> None: + """ + Handle SELECT SOURCE command on given player. + + - player_id: player_id of the player to handle the command. + - source: The ID of the source that needs to be activated/selected. + """ + player = self.get(player_id, True) + # handle source_id from source plugin + if "://plugin_source/" in source: + await self._play_plugin_source(player, source) + return + # basic check if player supports source selection + if PlayerFeature.SELECT_SOURCE not in player.supported_features: + raise UnsupportedFeaturedException( + f"Player {player.display_name} does not support source selection" + ) + # basic check if source is valid for player + if not any(x for x in player.source_list if x.id == source): + raise PlayerCommandFailed( + f"{source} is an invalid source for player {player.display_name}" + ) + # forward to player provider + provider = self.mass.get_provider(player.provider) + await provider.select_source(player_id, source) + @api_command("players/cmd/group") @handle_player_command async def cmd_group(self, player_id: str, target_player: str) -> None: @@ -1294,6 +1321,23 @@ class PlayerController(CoreController): self.logger.warning("Can not resume %s on %s", prev_item_id, player.display_name) # TODO !! + async def _play_plugin_source(self, player: Player, source: str) -> None: + """Handle playback of a plugin source on the player.""" + _, provider_id, source_id = await parse_uri(source) + if not (provider := self.mass.get_provider(provider_id)): + raise PlayerCommandFailed(f"Invalid (plugin)source {source}") + player_source = await provider.get_source(source_id) + url = self.mass.streams.get_plugin_source_url(provider_id, source_id, player.player_id) + # create a PlayerMedia object for the plugin source so + # we can send a regular play-media call downstream + media = player_source.metadata or PlayerMedia( + uri=url, + media_type=MediaType.OTHER, + title=player_source.name, + custom_data={"source": source}, + ) + await self.play_media(player.player_id, media) + async def _poll_players(self) -> None: """Background task that polls players for updates.""" while True: diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 8cc484b1..0a25ebda 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -65,6 +65,7 @@ from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool from music_assistant.helpers.webserver import Webserver from music_assistant.models.core_controller import CoreController +from music_assistant.models.plugin import PluginProvider if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig @@ -262,6 +263,11 @@ class StreamsController(CoreController): "/announcement/{player_id}.{fmt}", self.serve_announcement_stream, ), + ( + "*", + "/pluginsource/{provider_id}/{source_id}/{player_id}.{fmt}", + self.serve_plugin_source_stream, + ), ], ) @@ -588,6 +594,85 @@ class StreamsController(CoreController): return resp + async def serve_plugin_source_stream(self, request: web.Request) -> web.Response: + """Stream PluginSource audio to a player.""" + self._log_request(request) + provider_id = request.match_info["provider_id"] + provider: PluginProvider | None + if not (provider := self.mass.get_provider(provider_id)): + raise web.HTTPNotFound(reason=f"Unknown Provider: {provider_id}") + source_id = request.match_info["source_id"] + if not (source := await provider.get_source(source_id)): + raise web.HTTPNotFound(reason=f"Unknown PluginSource: {source_id}") + try: + streamdetails = await provider.get_stream_details(source_id, MediaType.PLUGIN_SOURCE) + except Exception: + err_msg = f"No streamdetails for PluginSource: {source_id}" + self.logger.error(err_msg) + raise web.HTTPNotFound(reason=err_msg) + + # work out output format/details + player_id = request.match_info["player_id"] + player = self.mass.players.get(player_id) + if not player: + raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}") + output_format = await self._get_output_format( + output_format_str=request.match_info["fmt"], + player=player, + default_sample_rate=streamdetails.audio_format.sample_rate, + default_bit_depth=streamdetails.audio_format.bit_depth, + ) + + # prepare request, add some DLNA/UPNP compatible headers + headers = { + **DEFAULT_STREAM_HEADERS, + "icy-name": source.name, + } + resp = web.StreamResponse( + status=200, + reason="OK", + headers=headers, + ) + resp.content_type = f"audio/{output_format.output_format_str}" + http_profile: str = await self.mass.config.get_player_config_value( + player_id, CONF_HTTP_PROFILE + ) + if http_profile == "forced_content_length" and streamdetails.duration: + # guess content length based on duration + resp.content_length = get_chunksize(output_format, streamdetails.duration) + elif http_profile == "chunked": + resp.enable_chunked_encoding() + + await resp.prepare(request) + + # return early if this is not a GET request + if request.method != "GET": + return resp + + # all checks passed, start streaming! + self.logger.debug( + "Start serving audio stream for PluginSource %s (%s) to %s", + source.name, + source.id, + player.display_name, + ) + async for chunk in self.get_plugin_source_stream( + streamdetails, + output_format=output_format, + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError, ConnectionError): + break + if streamdetails.stream_error: + self.logger.error( + "Error streaming PluginSource %s (%s) to %s", + source.name, + source.uri, + player.display_name, + ) + return resp + def get_command_url(self, player_or_queue_id: str, command: str) -> str: """Get the url for the special command stream.""" return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3" @@ -606,6 +691,20 @@ class StreamsController(CoreController): # like https hosts and it also offers the pre-announce 'bell' return f"{self.base_url}/announcement/{player_id}.{content_type.value}?pre_announce={use_pre_announce}" # noqa: E501 + def get_plugin_source_url( + self, + provider: str, + source_id: str, + player_id: str, + output_codec: ContentType = ContentType.FLAC, + ) -> str: + """Get the url for the Plugin Source stream/proxy.""" + fmt = output_codec.value + # handle raw pcm without exact format specifiers + if output_codec.is_pcm() and ";" not in fmt: + fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}" + return f"{self._server.base_url}/pluginsource/{provider}/{source_id}/{player_id}.{fmt}" + async def get_flow_stream( self, queue: PlayerQueue, @@ -887,6 +986,38 @@ class StreamsController(CoreController): ): yield chunk + async def get_plugin_source_stream( + self, + streamdetails: StreamDetails, + output_format: AudioFormat, + ) -> AsyncGenerator[tuple[bool, bytes], None]: + """Get the audio stream for a PluginSource.""" + streamdetails.seek_position = 0 + extra_input_args = ["-re"] + # work out audio source for these streamdetails + if streamdetails.stream_type == StreamType.CUSTOM: + audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( + streamdetails, + seek_position=streamdetails.seek_position, + ) + elif streamdetails.stream_type == StreamType.HLS: + substream = await get_hls_substream(self.mass, streamdetails.path) + audio_source = substream.path + else: + audio_source = streamdetails.path + + # add support for decryption key provided in streamdetails + if streamdetails.decryption_key: + extra_input_args += ["-decryption_key", streamdetails.decryption_key] + + async for chunk in get_ffmpeg_stream( + audio_input=audio_source, + input_format=streamdetails.audio_format, + output_format=output_format, + extra_input_args=extra_input_args, + ): + yield chunk + def _log_request(self, request: web.Request) -> None: """Log request.""" if not self.logger.isEnabledFor(VERBOSE_LOG_LEVEL): diff --git a/music_assistant/models/player_provider.py b/music_assistant/models/player_provider.py index f7f7ee64..a737da17 100644 --- a/music_assistant/models/player_provider.py +++ b/music_assistant/models/player_provider.py @@ -123,6 +123,11 @@ class PlayerProvider(Provider): # will only be called for players with PLAY_ANNOUNCEMENT feature set. raise NotImplementedError + async def select_source(self, player_id: str, source: str) -> None: + """Handle SELECT SOURCE command on given player.""" + # will only be called for sources that are defined in 'source_list'. + raise NotImplementedError + async def cmd_power(self, player_id: str, powered: bool) -> None: """Send POWER command to given player. diff --git a/music_assistant/models/plugin.py b/music_assistant/models/plugin.py index 8c0ebc5e..f5f98c8f 100644 --- a/music_assistant/models/plugin.py +++ b/music_assistant/models/plugin.py @@ -2,8 +2,17 @@ from __future__ import annotations +from collections.abc import AsyncGenerator +from typing import TYPE_CHECKING + +from music_assistant_models.enums import MediaType + from .provider import Provider +if TYPE_CHECKING: + from music_assistant_models.player import PlayerSource + from music_assistant_models.streamdetails import StreamDetails + # ruff: noqa: ARG001, ARG002 @@ -13,3 +22,35 @@ class PluginProvider(Provider): Plugin Provider implementations should inherit from this base model. """ + + async def get_sources(self) -> list[PlayerSource]: # type: ignore[return] + """Get all audio sources provided by this provider.""" + # Will only be called if ProviderFeature.AUDIO_SOURCE is declared + raise NotImplementedError + + async def get_source(self, prov_source_id: str) -> PlayerSource: # type: ignore[return] + """Get AudioSource details by id.""" + # Will only be called if ProviderFeature.AUDIO_SOURCE is declared + raise NotImplementedError + + async def get_stream_details( + self, item_id: str, media_type: MediaType = MediaType.OTHER + ) -> StreamDetails: + """Return the streamdetails to stream a naudiosource provided by this plugin.""" + # Will only be called if ProviderFeature.AUDIO_SOURCE is declared + raise NotImplementedError + + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """ + Return the (custom) audio stream for the provider item. + + Will only be called when the stream_type is set to CUSTOM. + """ + if False: + yield b"" + raise NotImplementedError + + async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None: + """Handle callback when an item completed streaming."""