from music_assistant.helpers.api import api_command
from music_assistant.helpers.tags import parse_tags
from music_assistant.helpers.throttle_retry import Throttler
+from music_assistant.helpers.uri import parse_uri
from music_assistant.helpers.util import TaskManager, get_changed_values, lock
from music_assistant.models.core_controller import CoreController
from music_assistant.models.player_provider import PlayerProvider
async with self._player_throttlers[player_id]:
await player_prov.enqueue_next_media(player_id=player_id, media=media)
+ async def select_source(self, player_id: str, source: str) -> None:
+ """
+ Handle SELECT SOURCE command on given player.
+
+ - player_id: player_id of the player to handle the command.
+ - source: The ID of the source that needs to be activated/selected.
+ """
+ player = self.get(player_id, True)
+ # handle source_id from source plugin
+ if "://plugin_source/" in source:
+ await self._play_plugin_source(player, source)
+ return
+ # basic check if player supports source selection
+ if PlayerFeature.SELECT_SOURCE not in player.supported_features:
+ raise UnsupportedFeaturedException(
+ f"Player {player.display_name} does not support source selection"
+ )
+ # basic check if source is valid for player
+ if not any(x for x in player.source_list if x.id == source):
+ raise PlayerCommandFailed(
+ f"{source} is an invalid source for player {player.display_name}"
+ )
+ # forward to player provider
+ provider = self.mass.get_provider(player.provider)
+ await provider.select_source(player_id, source)
+
@api_command("players/cmd/group")
@handle_player_command
async def cmd_group(self, player_id: str, target_player: str) -> None:
self.logger.warning("Can not resume %s on %s", prev_item_id, player.display_name)
# TODO !!
+ async def _play_plugin_source(self, player: Player, source: str) -> None:
+ """Handle playback of a plugin source on the player."""
+ _, provider_id, source_id = await parse_uri(source)
+ if not (provider := self.mass.get_provider(provider_id)):
+ raise PlayerCommandFailed(f"Invalid (plugin)source {source}")
+ player_source = await provider.get_source(source_id)
+ url = self.mass.streams.get_plugin_source_url(provider_id, source_id, player.player_id)
+ # create a PlayerMedia object for the plugin source so
+ # we can send a regular play-media call downstream
+ media = player_source.metadata or PlayerMedia(
+ uri=url,
+ media_type=MediaType.OTHER,
+ title=player_source.name,
+ custom_data={"source": source},
+ )
+ await self.play_media(player.player_id, media)
+
async def _poll_players(self) -> None:
"""Background task that polls players for updates."""
while True:
from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
from music_assistant.helpers.webserver import Webserver
from music_assistant.models.core_controller import CoreController
+from music_assistant.models.plugin import PluginProvider
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig
"/announcement/{player_id}.{fmt}",
self.serve_announcement_stream,
),
+ (
+ "*",
+ "/pluginsource/{provider_id}/{source_id}/{player_id}.{fmt}",
+ self.serve_plugin_source_stream,
+ ),
],
)
return resp
+ async def serve_plugin_source_stream(self, request: web.Request) -> web.Response:
+ """Stream PluginSource audio to a player."""
+ self._log_request(request)
+ provider_id = request.match_info["provider_id"]
+ provider: PluginProvider | None
+ if not (provider := self.mass.get_provider(provider_id)):
+ raise web.HTTPNotFound(reason=f"Unknown Provider: {provider_id}")
+ source_id = request.match_info["source_id"]
+ if not (source := await provider.get_source(source_id)):
+ raise web.HTTPNotFound(reason=f"Unknown PluginSource: {source_id}")
+ try:
+ streamdetails = await provider.get_stream_details(source_id, MediaType.PLUGIN_SOURCE)
+ except Exception:
+ err_msg = f"No streamdetails for PluginSource: {source_id}"
+ self.logger.error(err_msg)
+ raise web.HTTPNotFound(reason=err_msg)
+
+ # work out output format/details
+ player_id = request.match_info["player_id"]
+ player = self.mass.players.get(player_id)
+ if not player:
+ raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
+ output_format = await self._get_output_format(
+ output_format_str=request.match_info["fmt"],
+ player=player,
+ default_sample_rate=streamdetails.audio_format.sample_rate,
+ default_bit_depth=streamdetails.audio_format.bit_depth,
+ )
+
+ # prepare request, add some DLNA/UPNP compatible headers
+ headers = {
+ **DEFAULT_STREAM_HEADERS,
+ "icy-name": source.name,
+ }
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers=headers,
+ )
+ resp.content_type = f"audio/{output_format.output_format_str}"
+ http_profile: str = await self.mass.config.get_player_config_value(
+ player_id, CONF_HTTP_PROFILE
+ )
+ if http_profile == "forced_content_length" and streamdetails.duration:
+ # guess content length based on duration
+ resp.content_length = get_chunksize(output_format, streamdetails.duration)
+ elif http_profile == "chunked":
+ resp.enable_chunked_encoding()
+
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving audio stream for PluginSource %s (%s) to %s",
+ source.name,
+ source.id,
+ player.display_name,
+ )
+ async for chunk in self.get_plugin_source_stream(
+ streamdetails,
+ output_format=output_format,
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError, ConnectionError):
+ break
+ if streamdetails.stream_error:
+ self.logger.error(
+ "Error streaming PluginSource %s (%s) to %s",
+ source.name,
+ source.uri,
+ player.display_name,
+ )
+ return resp
+
def get_command_url(self, player_or_queue_id: str, command: str) -> str:
"""Get the url for the special command stream."""
return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
# like https hosts and it also offers the pre-announce 'bell'
return f"{self.base_url}/announcement/{player_id}.{content_type.value}?pre_announce={use_pre_announce}" # noqa: E501
+ def get_plugin_source_url(
+ self,
+ provider: str,
+ source_id: str,
+ player_id: str,
+ output_codec: ContentType = ContentType.FLAC,
+ ) -> str:
+ """Get the url for the Plugin Source stream/proxy."""
+ fmt = output_codec.value
+ # handle raw pcm without exact format specifiers
+ if output_codec.is_pcm() and ";" not in fmt:
+ fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
+ return f"{self._server.base_url}/pluginsource/{provider}/{source_id}/{player_id}.{fmt}"
+
async def get_flow_stream(
self,
queue: PlayerQueue,
):
yield chunk
+ async def get_plugin_source_stream(
+ self,
+ streamdetails: StreamDetails,
+ output_format: AudioFormat,
+ ) -> AsyncGenerator[tuple[bool, bytes], None]:
+ """Get the audio stream for a PluginSource."""
+ streamdetails.seek_position = 0
+ extra_input_args = ["-re"]
+ # work out audio source for these streamdetails
+ if streamdetails.stream_type == StreamType.CUSTOM:
+ audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+ streamdetails,
+ seek_position=streamdetails.seek_position,
+ )
+ elif streamdetails.stream_type == StreamType.HLS:
+ substream = await get_hls_substream(self.mass, streamdetails.path)
+ audio_source = substream.path
+ else:
+ audio_source = streamdetails.path
+
+ # add support for decryption key provided in streamdetails
+ if streamdetails.decryption_key:
+ extra_input_args += ["-decryption_key", streamdetails.decryption_key]
+
+ async for chunk in get_ffmpeg_stream(
+ audio_input=audio_source,
+ input_format=streamdetails.audio_format,
+ output_format=output_format,
+ extra_input_args=extra_input_args,
+ ):
+ yield chunk
+
def _log_request(self, request: web.Request) -> None:
"""Log request."""
if not self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
from __future__ import annotations
+from collections.abc import AsyncGenerator
+from typing import TYPE_CHECKING
+
+from music_assistant_models.enums import MediaType
+
from .provider import Provider
+if TYPE_CHECKING:
+ from music_assistant_models.player import PlayerSource
+ from music_assistant_models.streamdetails import StreamDetails
+
# ruff: noqa: ARG001, ARG002
Plugin Provider implementations should inherit from this base model.
"""
+
+ async def get_sources(self) -> list[PlayerSource]: # type: ignore[return]
+ """Get all audio sources provided by this provider."""
+ # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
+ raise NotImplementedError
+
+ async def get_source(self, prov_source_id: str) -> PlayerSource: # type: ignore[return]
+ """Get AudioSource details by id."""
+ # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
+ raise NotImplementedError
+
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.OTHER
+ ) -> StreamDetails:
+ """Return the streamdetails to stream a naudiosource provided by this plugin."""
+ # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
+ raise NotImplementedError
+
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """
+ Return the (custom) audio stream for the provider item.
+
+ Will only be called when the stream_type is set to CUSTOM.
+ """
+ if False:
+ yield b""
+ raise NotImplementedError
+
+ async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ """Handle callback when an item completed streaming."""