if player.synced_to or player.active_group:
raise PlayerCommandFailed(f"Player {player.display_name} is currently grouped")
# check if player is already playing and source is different
- # in that case we to stop the player first
- if source != player.active_source and player.state != PlayerState.IDLE:
+ # in that case we need to stop the player first
+ prev_source = player.active_source
+ if prev_source and source != prev_source and player.state != PlayerState.IDLE:
await self.cmd_stop(player_id)
await asyncio.sleep(0.5) # small delay to allow stop to process
+ player.active_source = None
# check if source is a pluginsource
# in that case the source id is the lookup_key of the plugin provider
if plugin_prov := self.mass.get_provider(source):
# this can be used to restore the queue after a source switch
if mass_queue := self.mass.player_queues.get(source):
player.active_source = mass_queue.queue_id
- if mass_queue.items:
- await self.mass.player_queues.play(mass_queue.queue_id)
return
# basic check if player supports source selection
if PlayerFeature.SELECT_SOURCE not in player.supported_features:
return self._get_active_source(group_player)
# if player has plugin source active return that
for plugin_source in self._get_plugin_sources():
- if (
- plugin_source.in_use_by == player.player_id
- ) or player.active_source == plugin_source.id:
+ if player.active_source == plugin_source.id or (
+ player.current_media and plugin_source.id == player.current_media.queue_id
+ ):
# copy/set current media if available
if plugin_source.metadata:
player.set_current_media(
) -> None:
"""Handle playback/select of given plugin source on player."""
plugin_source = plugin_prov.get_source()
- if plugin_source.in_use_by and plugin_source.in_use_by != player.player_id:
+ if plugin_prov.in_use_by and plugin_prov.in_use_by != player.player_id:
raise PlayerCommandFailed(
f"Source {plugin_source.name} is already in use by another player"
)
- plugin_source.in_use_by = player.player_id
player.active_source = plugin_source.id
stream_url = self.mass.streams.get_plugin_source_url(plugin_source.id, player.player_id)
await self.play_media(
uri=stream_url,
media_type=MediaType.PLUGIN_SOURCE,
title=plugin_source.name,
+ queue_id=plugin_source.id,
custom_data={
"provider": plugin_source.id,
"audio_format": plugin_source.audio_format,
def _set_player_sources(self, player: Player) -> None:
"""Set all available player sources."""
player_source_ids = [x.id for x in player.source_list]
- for plugin_source in self._get_plugin_sources():
- if plugin_source.id in player_source_ids:
+ for plugin_prov in self.mass.get_providers(ProviderType.PLUGIN):
+ if ProviderFeature.AUDIO_SOURCE not in plugin_prov.supported_features:
+ continue
+ if plugin_prov.in_use_by and plugin_prov.in_use_by != player.player_id:
continue
- if plugin_source.passive and plugin_source.in_use_by != player.player_id:
+ plugin_source = plugin_prov.get_source()
+ if plugin_source.id in player_source_ids:
continue
player.source_list.append(plugin_source)
player_filter_params: list[str] | None = None,
) -> AsyncGenerator[bytes, None]:
"""Get the special plugin source stream."""
- provider: PluginProvider = self.mass.get_provider(plugin_source_id)
- plugin_source = provider.get_source()
- if plugin_source.in_use_by and plugin_source.in_use_by != player_id:
+ player = self.mass.players.get(player_id)
+ plugin_prov: PluginProvider = self.mass.get_provider(plugin_source_id)
+ plugin_source = plugin_prov.get_source()
+ if plugin_prov.in_use_by and plugin_prov.in_use_by != player_id:
raise RuntimeError(
- f"PluginSource plugin_source.name is already in use by {plugin_source.in_use_by}"
+ f"PluginSource plugin_source.name is already in use by {plugin_prov.in_use_by}"
)
-
audio_input = (
- provider.get_audio_stream(player_id)
+ plugin_prov.get_audio_stream(player_id)
if plugin_source.stream_type == StreamType.CUSTOM
else plugin_source.path
)
chunk_size = int(get_chunksize(output_format, 1) / 10)
- try:
- plugin_source.in_use_by = player_id
- async for chunk in get_ffmpeg_stream(
- audio_input=audio_input,
- input_format=plugin_source.audio_format,
- output_format=output_format,
- chunk_size=chunk_size,
- filter_params=player_filter_params,
- extra_input_args=["-re"],
- ):
- yield chunk
- finally:
- plugin_source.in_use_by = None
+ player.active_source = plugin_source_id
+ async for chunk in get_ffmpeg_stream(
+ audio_input=audio_input,
+ input_format=plugin_source.audio_format,
+ output_format=output_format,
+ chunk_size=chunk_size,
+ filter_params=player_filter_params,
+ extra_input_args=["-re"],
+ ):
+ yield chunk
async def get_queue_item_stream(
self,
repr=False,
)
- # use this if the plugin can only provide a source to a single player at a time
- in_use_by: str | None = field(
- default=None,
- compare=False,
- metadata=field_options(serialize="omit", deserialize=pass_through),
- repr=False,
- )
-
# metadata of the current playing media (if known)
metadata: PlayerMedia | None = field(
default=None,
Plugin Provider implementations should inherit from this base model.
"""
+ @property
+ def in_use_by(self) -> str | None:
+ """Return player id that is currently using this plugin (if any)."""
+ for player in self.mass.players:
+ if player.active_source == self.lookup_key:
+ return player.player_id
+ return None
+
def get_source(self) -> PluginSource: # type: ignore[return]
"""Get (audio)source details for this plugin."""
# Will only be called if ProviderFeature.AUDIO_SOURCE is declared
PlayerState,
PlayerType,
ProviderFeature,
+ StreamType,
)
from music_assistant_models.media_items import AudioFormat
from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.helpers.util import TaskManager, get_ip_pton, lock, select_free_port
from music_assistant.models.player_provider import PlayerProvider
+from music_assistant.models.plugin import PluginProvider
from music_assistant.providers.airplay.raop import RaopStreamSession
from music_assistant.providers.player_group import PlayerGroupProvider
)
elif media.media_type == MediaType.PLUGIN_SOURCE:
# special case: plugin source stream
- input_format = AIRPLAY_PCM_FORMAT
+ # consume the stream directly, so we can skip one step in between
assert media.custom_data is not None # for type checking
- audio_source = self.mass.streams.get_plugin_source_stream(
- plugin_source_id=media.custom_data["provider"],
- output_format=AIRPLAY_PCM_FORMAT,
- player_id=player_id,
+ provider = cast(PluginProvider, self.mass.get_provider(media.custom_data["provider"]))
+ plugin_source = provider.get_source()
+ assert plugin_source.audio_format is not None # for type checking
+ input_format = plugin_source.audio_format
+ audio_source = (
+ provider.get_audio_stream(player_id) # type: ignore[assignment]
+ if plugin_source.stream_type == StreamType.CUSTOM
+ else cast(str, plugin_source.path)
)
elif media.queue_id and media.queue_id.startswith("ugp_"):
# special case: UGP stream
PlayerState,
PlayerType,
ProviderFeature,
+ StreamType,
)
from music_assistant_models.errors import SetupFailedError
from music_assistant_models.media_items import AudioFormat
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.util import get_ip_pton
from music_assistant.models.player_provider import PlayerProvider
+from music_assistant.models.plugin import PluginProvider
if TYPE_CHECKING:
from music_assistant_models.config_entries import ProviderConfig
if stream_task := self._stream_tasks.pop(player_id, None):
if not stream_task.done():
stream_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await stream_task
+ # assign default/empty stream to the player
+ await self._get_snapgroup(player_id).set_stream("default")
+ await asyncio.sleep(0.5)
player.state = PlayerState.IDLE
+ player.current_media = None
+ player.active_source = None
self._set_childs_state(player_id)
self.mass.players.update(player_id)
- # assign default/empty stream to the player
- await self._get_snapgroup(player_id).set_stream("default")
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send MUTE command to given player."""
elif media.media_type == MediaType.PLUGIN_SOURCE:
# special case: plugin source stream
# consume the stream directly, so we can skip one step in between
- # provider: PluginProvider = self.mass.get_provider(media.custom_data["provider"])
- # plugin_source = provider.get_source()
- # audio_source = (
- # provider.get_audio_stream(player_id)
- # if plugin_source.stream_type == StreamType.CUSTOM
- # else plugin_source.path
- # )
- input_format = DEFAULT_SNAPCAST_FORMAT
- audio_source = self.mass.streams.get_plugin_source_stream(
- plugin_source_id=media.custom_data["provider"],
- output_format=DEFAULT_SNAPCAST_FORMAT,
- player_id=player_id,
+ assert media.custom_data is not None # for type checking
+ provider = cast(PluginProvider, self.mass.get_provider(media.custom_data["provider"]))
+ plugin_source = provider.get_source()
+ assert plugin_source.audio_format is not None # for type checking
+ input_format = plugin_source.audio_format
+ audio_source = (
+ provider.get_audio_stream(player_id)
+ if plugin_source.stream_type == StreamType.CUSTOM
+ else plugin_source.path
)
elif media.queue_id.startswith("ugp_"):
# special case: UGP stream
self.mass, player_id, input_format, DEFAULT_SNAPCAST_FORMAT
),
audio_output=stream_path,
+ extra_input_args=["-re"],
) as ffmpeg_proc:
player.state = PlayerState.PLAYING
player.current_media = media
import os
import pathlib
from collections.abc import Callable
+from contextlib import suppress
from typing import TYPE_CHECKING, cast
from aiohttp.web import Response
self._stop_called = True
if self._runner_task and not self._runner_task.done():
self._runner_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._runner_task
for callback in self._on_unload_callbacks:
callback()
self.logger.info("Starting Spotify Connect background daemon")
os.environ["MASS_CALLBACK"] = f"{self.mass.streams.base_url}/{self.instance_id}"
await check_output("rm", "-f", self.named_pipe)
- await asyncio.sleep(0.5)
+ await asyncio.sleep(0.1)
await check_output("mkfifo", self.named_pipe)
- await asyncio.sleep(0.5)
+ await asyncio.sleep(0.1)
try:
args: list[str] = [
self._librespot_bin,
if "couldn't parse packet from " in line:
continue
self.logger.debug(line)
-
- except asyncio.CancelledError:
- await librespot.close(True)
finally:
+ await librespot.close(True)
self.logger.info("Spotify Connect background daemon stopped for %s", self.name)
await check_output("rm", "-f", self.named_pipe)
# auto restart if not stopped manually
# handle session connected event
# this player has become the active spotify connect player
# we need to start the playback
- self.logger.error("%s - %s", self.name, json_data.get("event"))
- if not self._source_details.in_use_by and json_data.get("event") in (
- # "session_connected",
- # "loading",
- "sink",
+ if json_data.get("event") in ("sink",) and (
+ not self.in_use_by
+ or ((player := self.mass.players.get(self.in_use_by)) and player.state == "idle")
):
# initiate playback by selecting this source on the default player
self.mass.create_task(