from music_assistant.helpers.api import api_command
from music_assistant.helpers.tags import async_parse_tags
from music_assistant.helpers.throttle_retry import Throttler
-from music_assistant.helpers.uri import parse_uri
from music_assistant.helpers.util import TaskManager, get_changed_values
from music_assistant.models.core_controller import CoreController
from music_assistant.models.player_provider import PlayerProvider
- source: The ID of the source that needs to be activated/selected.
"""
player = self.get(player_id, True)
- # handle source_id from source plugin
- if "://plugin_source/" in source:
- await self._play_plugin_source(player, source)
- return
# basic check if player supports source selection
if PlayerFeature.SELECT_SOURCE not in player.supported_features:
raise UnsupportedFeaturedException(
self.logger.warning("Can not resume %s on %s", prev_item_id, player.display_name)
# TODO !!
- async def _play_plugin_source(self, player: Player, source: str) -> None:
- """Handle playback of a plugin source on the player."""
- _, provider_id, source_id = await parse_uri(source)
- if not (provider := self.mass.get_provider(provider_id)):
- raise PlayerCommandFailed(f"Invalid (plugin)source {source}")
- player_source = await provider.get_source(source_id)
- url = self.mass.streams.get_plugin_source_url(provider_id, source_id, player.player_id)
- # create a PlayerMedia object for the plugin source so
- # we can send a regular play-media call downstream
- media = player_source.metadata or PlayerMedia(
- uri=url,
- media_type=MediaType.OTHER,
- title=player_source.name,
- custom_data={"source": source},
- )
- media.uri = url
- await self.play_media(player.player_id, media)
-
async def _poll_players(self) -> None:
"""Background task that polls players for updates."""
while True: