Feat: Add base support for audio sources
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 23 Nov 2024 01:13:40 +0000 (02:13 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 23 Nov 2024 01:13:40 +0000 (02:13 +0100)
music_assistant/controllers/players.py
music_assistant/controllers/streams.py
music_assistant/models/player_provider.py
music_assistant/models/plugin.py

index 669077a5ac823ca0fab11ef7da7f95746ebaf76c..f230fe62117d453e6d85f3056c8588be2d0c498b 100644 (file)
@@ -46,6 +46,7 @@ from music_assistant.constants import (
 from music_assistant.helpers.api import api_command
 from music_assistant.helpers.tags import parse_tags
 from music_assistant.helpers.throttle_retry import Throttler
+from music_assistant.helpers.uri import parse_uri
 from music_assistant.helpers.util import TaskManager, get_changed_values, lock
 from music_assistant.models.core_controller import CoreController
 from music_assistant.models.player_provider import PlayerProvider
@@ -619,6 +620,32 @@ class PlayerController(CoreController):
         async with self._player_throttlers[player_id]:
             await player_prov.enqueue_next_media(player_id=player_id, media=media)
 
+    async def select_source(self, player_id: str, source: str) -> None:
+        """
+        Handle SELECT SOURCE command on given player.
+
+        - player_id: player_id of the player to handle the command.
+        - source: The ID of the source that needs to be activated/selected.
+        """
+        player = self.get(player_id, True)
+        # handle source_id from source plugin
+        if "://plugin_source/" in source:
+            await self._play_plugin_source(player, source)
+            return
+        # basic check if player supports source selection
+        if PlayerFeature.SELECT_SOURCE not in player.supported_features:
+            raise UnsupportedFeaturedException(
+                f"Player {player.display_name} does not support source selection"
+            )
+        # basic check if source is valid for player
+        if not any(x for x in player.source_list if x.id == source):
+            raise PlayerCommandFailed(
+                f"{source} is an invalid source for player {player.display_name}"
+            )
+        # forward to player provider
+        provider = self.mass.get_provider(player.provider)
+        await provider.select_source(player_id, source)
+
     @api_command("players/cmd/group")
     @handle_player_command
     async def cmd_group(self, player_id: str, target_player: str) -> None:
@@ -1294,6 +1321,23 @@ class PlayerController(CoreController):
             self.logger.warning("Can not resume %s on %s", prev_item_id, player.display_name)
             # TODO !!
 
+    async def _play_plugin_source(self, player: Player, source: str) -> None:
+        """Handle playback of a plugin source on the player."""
+        _, provider_id, source_id = await parse_uri(source)
+        if not (provider := self.mass.get_provider(provider_id)):
+            raise PlayerCommandFailed(f"Invalid (plugin)source {source}")
+        player_source = await provider.get_source(source_id)
+        url = self.mass.streams.get_plugin_source_url(provider_id, source_id, player.player_id)
+        # create a PlayerMedia object for the plugin source so
+        # we can send a regular play-media call downstream
+        media = player_source.metadata or PlayerMedia(
+            uri=url,
+            media_type=MediaType.OTHER,
+            title=player_source.name,
+            custom_data={"source": source},
+        )
+        await self.play_media(player.player_id, media)
+
     async def _poll_players(self) -> None:
         """Background task that polls players for updates."""
         while True:
index 8cc484b16ea3b68b445fb677a7347ff15933827f..0a25ebda9190287074b3ec6d252b21a888590cb7 100644 (file)
@@ -65,6 +65,7 @@ from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
 from music_assistant.helpers.webserver import Webserver
 from music_assistant.models.core_controller import CoreController
+from music_assistant.models.plugin import PluginProvider
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import CoreConfig
@@ -262,6 +263,11 @@ class StreamsController(CoreController):
                     "/announcement/{player_id}.{fmt}",
                     self.serve_announcement_stream,
                 ),
+                (
+                    "*",
+                    "/pluginsource/{provider_id}/{source_id}/{player_id}.{fmt}",
+                    self.serve_plugin_source_stream,
+                ),
             ],
         )
 
@@ -588,6 +594,85 @@ class StreamsController(CoreController):
 
         return resp
 
+    async def serve_plugin_source_stream(self, request: web.Request) -> web.Response:
+        """Stream PluginSource audio to a player."""
+        self._log_request(request)
+        provider_id = request.match_info["provider_id"]
+        provider: PluginProvider | None
+        if not (provider := self.mass.get_provider(provider_id)):
+            raise web.HTTPNotFound(reason=f"Unknown Provider: {provider_id}")
+        source_id = request.match_info["source_id"]
+        if not (source := await provider.get_source(source_id)):
+            raise web.HTTPNotFound(reason=f"Unknown PluginSource: {source_id}")
+        try:
+            streamdetails = await provider.get_stream_details(source_id, MediaType.PLUGIN_SOURCE)
+        except Exception:
+            err_msg = f"No streamdetails for PluginSource: {source_id}"
+            self.logger.error(err_msg)
+            raise web.HTTPNotFound(reason=err_msg)
+
+        # work out output format/details
+        player_id = request.match_info["player_id"]
+        player = self.mass.players.get(player_id)
+        if not player:
+            raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
+        output_format = await self._get_output_format(
+            output_format_str=request.match_info["fmt"],
+            player=player,
+            default_sample_rate=streamdetails.audio_format.sample_rate,
+            default_bit_depth=streamdetails.audio_format.bit_depth,
+        )
+
+        # prepare request, add some DLNA/UPNP compatible headers
+        headers = {
+            **DEFAULT_STREAM_HEADERS,
+            "icy-name": source.name,
+        }
+        resp = web.StreamResponse(
+            status=200,
+            reason="OK",
+            headers=headers,
+        )
+        resp.content_type = f"audio/{output_format.output_format_str}"
+        http_profile: str = await self.mass.config.get_player_config_value(
+            player_id, CONF_HTTP_PROFILE
+        )
+        if http_profile == "forced_content_length" and streamdetails.duration:
+            # guess content length based on duration
+            resp.content_length = get_chunksize(output_format, streamdetails.duration)
+        elif http_profile == "chunked":
+            resp.enable_chunked_encoding()
+
+        await resp.prepare(request)
+
+        # return early if this is not a GET request
+        if request.method != "GET":
+            return resp
+
+        # all checks passed, start streaming!
+        self.logger.debug(
+            "Start serving audio stream for PluginSource %s (%s) to %s",
+            source.name,
+            source.id,
+            player.display_name,
+        )
+        async for chunk in self.get_plugin_source_stream(
+            streamdetails,
+            output_format=output_format,
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError, ConnectionError):
+                break
+        if streamdetails.stream_error:
+            self.logger.error(
+                "Error streaming PluginSource %s (%s) to %s",
+                source.name,
+                source.uri,
+                player.display_name,
+            )
+        return resp
+
     def get_command_url(self, player_or_queue_id: str, command: str) -> str:
         """Get the url for the special command stream."""
         return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
@@ -606,6 +691,20 @@ class StreamsController(CoreController):
         # like https hosts and it also offers the pre-announce 'bell'
         return f"{self.base_url}/announcement/{player_id}.{content_type.value}?pre_announce={use_pre_announce}"  # noqa: E501
 
+    def get_plugin_source_url(
+        self,
+        provider: str,
+        source_id: str,
+        player_id: str,
+        output_codec: ContentType = ContentType.FLAC,
+    ) -> str:
+        """Get the url for the Plugin Source stream/proxy."""
+        fmt = output_codec.value
+        # handle raw pcm without exact format specifiers
+        if output_codec.is_pcm() and ";" not in fmt:
+            fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
+        return f"{self._server.base_url}/pluginsource/{provider}/{source_id}/{player_id}.{fmt}"
+
     async def get_flow_stream(
         self,
         queue: PlayerQueue,
@@ -887,6 +986,38 @@ class StreamsController(CoreController):
         ):
             yield chunk
 
+    async def get_plugin_source_stream(
+        self,
+        streamdetails: StreamDetails,
+        output_format: AudioFormat,
+    ) -> AsyncGenerator[tuple[bool, bytes], None]:
+        """Get the audio stream for a PluginSource."""
+        streamdetails.seek_position = 0
+        extra_input_args = ["-re"]
+        # work out audio source for these streamdetails
+        if streamdetails.stream_type == StreamType.CUSTOM:
+            audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+                streamdetails,
+                seek_position=streamdetails.seek_position,
+            )
+        elif streamdetails.stream_type == StreamType.HLS:
+            substream = await get_hls_substream(self.mass, streamdetails.path)
+            audio_source = substream.path
+        else:
+            audio_source = streamdetails.path
+
+        # add support for decryption key provided in streamdetails
+        if streamdetails.decryption_key:
+            extra_input_args += ["-decryption_key", streamdetails.decryption_key]
+
+        async for chunk in get_ffmpeg_stream(
+            audio_input=audio_source,
+            input_format=streamdetails.audio_format,
+            output_format=output_format,
+            extra_input_args=extra_input_args,
+        ):
+            yield chunk
+
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
         if not self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
index f7f7ee64bf10116326f782fff0dec5fe6fdb0434..a737da174dc51159b7733bee46525c5ed5a63cbf 100644 (file)
@@ -123,6 +123,11 @@ class PlayerProvider(Provider):
         # will only be called for players with PLAY_ANNOUNCEMENT feature set.
         raise NotImplementedError
 
+    async def select_source(self, player_id: str, source: str) -> None:
+        """Handle SELECT SOURCE command on given player."""
+        # will only be called for sources that are defined in 'source_list'.
+        raise NotImplementedError
+
     async def cmd_power(self, player_id: str, powered: bool) -> None:
         """Send POWER command to given player.
 
index 8c0ebc5e9899c16272e739b75853345f08e95c78..f5f98c8fa49bd5fe83f2e50b164281298cce5f8a 100644 (file)
@@ -2,8 +2,17 @@
 
 from __future__ import annotations
 
+from collections.abc import AsyncGenerator
+from typing import TYPE_CHECKING
+
+from music_assistant_models.enums import MediaType
+
 from .provider import Provider
 
+if TYPE_CHECKING:
+    from music_assistant_models.player import PlayerSource
+    from music_assistant_models.streamdetails import StreamDetails
+
 # ruff: noqa: ARG001, ARG002
 
 
@@ -13,3 +22,35 @@ class PluginProvider(Provider):
 
     Plugin Provider implementations should inherit from this base model.
     """
+
+    async def get_sources(self) -> list[PlayerSource]:  # type: ignore[return]
+        """Get all audio sources provided by this provider."""
+        # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
+        raise NotImplementedError
+
+    async def get_source(self, prov_source_id: str) -> PlayerSource:  # type: ignore[return]
+        """Get AudioSource details by id."""
+        # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
+        raise NotImplementedError
+
+    async def get_stream_details(
+        self, item_id: str, media_type: MediaType = MediaType.OTHER
+    ) -> StreamDetails:
+        """Return the streamdetails to stream a naudiosource provided by this plugin."""
+        # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
+        raise NotImplementedError
+
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """
+        Return the (custom) audio stream for the provider item.
+
+        Will only be called when the stream_type is set to CUSTOM.
+        """
+        if False:
+            yield b""
+        raise NotImplementedError
+
+    async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+        """Handle callback when an item completed streaming."""