Handle playback of plugin source as player source and implement this in Spotify Connect
queue.index_in_buffer = index
queue.flow_mode_stream_log = []
queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
- # no point in enabled flow mode for radio or plugin sources
- if queue_item.media_type in (MediaType.RADIO, MediaType.PLUGIN_SOURCE):
+ # no point in enabling flow mode for radio sources
+ if queue_item.media_type == MediaType.RADIO:
queue.flow_mode = False
queue.current_item = queue_item
PlayerFeature,
PlayerState,
PlayerType,
+ ProviderFeature,
ProviderType,
)
from music_assistant_models.errors import (
from music_assistant.helpers.util import TaskManager, get_changed_values
from music_assistant.models.core_controller import CoreController
from music_assistant.models.player_provider import PlayerProvider
+from music_assistant.models.plugin import PluginProvider, PluginSource
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Coroutine, Iterator
- source: The ID of the source that needs to be activated/selected.
"""
player = self.get(player_id, True)
+ if player.synced_to or player.active_group:
+ raise PlayerCommandFailed(f"Player {player.display_name} is currently grouped")
+ # check if player is already playing and source is different
+ # in that case we to stop the player first
+ if source != player.active_source and player.state != PlayerState.IDLE:
+ await self.cmd_stop(player_id)
+ await asyncio.sleep(0.5) # small delay to allow stop to process
+ # check if source is a pluginsource
+ # in that case the source id is the lookup_key of the plugin provider
+ if plugin_prov := self.mass.get_provider(source):
+ await self._handle_select_plugin_source(player, plugin_prov)
+ return
+ # check if source is a mass queue
+ # this can be used to restore the queue after a source switch
+ if mass_queue := self.mass.player_queues.get(source):
+ player.active_source = mass_queue.queue_id
+ if mass_queue.items:
+ await self.mass.player_queues.play(mass_queue.queue_id)
+ return
# basic check if player supports source selection
if PlayerFeature.SELECT_SOURCE not in player.supported_features:
raise UnsupportedFeaturedException(
player = self._players[player_id]
prev_state = self._prev_states.get(player_id, {})
player.active_source = self._get_active_source(player)
+ # set player sources
+ self._set_player_sources(player)
# prefer any overridden name from config
player.display_name = (
self.mass.config.get_raw_player_config_value(player.player_id, "name")
# if player has group active, return those details
if player.active_group and (group_player := self.get(player.active_group)):
return self._get_active_source(group_player)
+ # if player has plugin source active return that
+ for plugin_source in self._get_plugin_sources():
+ if (
+ plugin_source.in_use_by == player.player_id
+ ) or player.active_source == plugin_source.id:
+ # copy/set current media if available
+ if plugin_source.metadata:
+ player.set_current_media(
+ uri=plugin_source.metadata.uri,
+ media_type=plugin_source.metadata.media_type,
+ title=plugin_source.metadata.title,
+ artist=plugin_source.metadata.artist,
+ album=plugin_source.metadata.album,
+ image_url=plugin_source.metadata.image_url,
+ duration=plugin_source.metadata.duration,
+ )
+ return plugin_source.id
# defaults to the player's own player id if no active source set
return player.active_source or player.player_id
# always update player state
self.mass.loop.call_soon(self.update, player_id)
await asyncio.sleep(1)
+
+ async def _handle_select_plugin_source(
+ self, player: Player, plugin_prov: PluginProvider
+ ) -> None:
+ """Handle playback/select of given plugin source on player."""
+ plugin_source = plugin_prov.get_source()
+ if plugin_source.in_use_by and plugin_source.in_use_by != player.player_id:
+ raise PlayerCommandFailed(
+ f"Source {plugin_source.name} is already in use by another player"
+ )
+ plugin_source.in_use_by = player.player_id
+ player.active_source = plugin_source.id
+ stream_url = self.mass.streams.get_plugin_source_url(plugin_source.id, player.player_id)
+ await self.play_media(
+ player_id=player.player_id,
+ media=PlayerMedia(
+ uri=stream_url,
+ media_type=MediaType.PLUGIN_SOURCE,
+ title=plugin_source.name,
+ custom_data={
+ "provider": plugin_source.id,
+ "audio_format": plugin_source.audio_format,
+ },
+ ),
+ )
+
+ def _get_plugin_sources(self) -> list[PluginSource]:
+ """Return all available plugin sources."""
+ return [
+ plugin_prov.get_source()
+ for plugin_prov in self.mass.get_providers(ProviderType.PLUGIN)
+ if ProviderFeature.AUDIO_SOURCE in plugin_prov.supported_features
+ ]
+
+ def _set_player_sources(self, player: Player) -> None:
+ """Set all available player sources."""
+ player_source_ids = [x.id for x in player.source_list]
+ for plugin_source in self._get_plugin_sources():
+ if plugin_source.id in player_source_ids:
+ continue
+ if plugin_source.passive and plugin_source.in_use_by != player.player_id:
+ continue
+ player.source_list.append(plugin_source)
from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
from music_assistant.helpers.webserver import Webserver
from music_assistant.models.core_controller import CoreController
+from music_assistant.models.plugin import PluginProvider
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig
from music_assistant_models.player import Player
from music_assistant_models.player_queue import PlayerQueue
from music_assistant_models.queue_item import QueueItem
- from music_assistant_models.streamdetails import StreamDetails
isfile = wrap(os.path.isfile)
"/announcement/{player_id}.{fmt}",
self.serve_announcement_stream,
),
+ (
+ "*",
+ "/pluginsource/{plugin_source}/{player_id}.{fmt}",
+ self.serve_plugin_source_stream,
+ ),
],
)
headers = {
**DEFAULT_STREAM_HEADERS,
"icy-name": queue_item.name,
+ "Accept-Ranges": "none",
+ "Content-Type": f"audio/{output_format.output_format_str}",
}
resp = web.StreamResponse(
status=200,
self.mass.player_queues.track_loaded_in_buffer(queue_id, queue_item_id)
async for chunk in get_ffmpeg_stream(
- audio_input=self.get_media_stream(
- streamdetails=queue_item.streamdetails,
+ audio_input=self.get_queue_item_stream(
+ queue_item=queue_item,
pcm_format=pcm_format,
),
input_format=pcm_format,
self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
async for chunk in get_ffmpeg_stream(
- audio_input=self.get_flow_stream(
+ audio_input=self.get_queue_flow_stream(
queue=queue,
start_queue_item=start_queue_item,
pcm_format=flow_pcm_format,
return resp
+ async def serve_plugin_source_stream(self, request: web.Request) -> web.Response:
+ """Stream PluginSource audio to a player."""
+ self._log_request(request)
+ plugin_source_id = request.match_info["plugin_source"]
+ provider: PluginProvider | None
+ if not (provider := self.mass.get_provider(plugin_source_id)):
+ raise web.HTTPNotFound(reason=f"Unknown PluginSource: {plugin_source_id}")
+ # work out output format/details
+ player_id = request.match_info["player_id"]
+ player = self.mass.players.get(player_id)
+ if not player:
+ raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
+ plugin_source = provider.get_source()
+ output_format = await self.get_output_format(
+ output_format_str=request.match_info["fmt"],
+ player=player,
+ default_sample_rate=plugin_source.audio_format.sample_rate,
+ default_bit_depth=plugin_source.audio_format.bit_depth,
+ )
+ headers = {
+ **DEFAULT_STREAM_HEADERS,
+ "icy-name": plugin_source.name,
+ "Accept-Ranges": "none",
+ "Content-Type": f"audio/{output_format.output_format_str}",
+ }
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers=headers,
+ )
+ resp.content_type = f"audio/{output_format.output_format_str}"
+ http_profile: str = await self.mass.config.get_player_config_value(
+ player_id, CONF_HTTP_PROFILE
+ )
+ if http_profile == "forced_content_length":
+ # guess content length based on duration
+ resp.content_length = get_chunksize(output_format, 12 * 3600)
+ elif http_profile == "chunked":
+ resp.enable_chunked_encoding()
+
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving audio stream for PluginSource %s (%s) to %s",
+ plugin_source.name,
+ plugin_source.id,
+ player.display_name,
+ )
+ async for chunk in self.get_plugin_source_stream(
+ plugin_source_id=plugin_source_id,
+ output_format=output_format,
+ player_id=player_id,
+ player_filter_params=get_player_filter_params(
+ self.mass, player_id, plugin_source.audio_format, output_format
+ ),
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError, ConnectionError):
+ break
+ return resp
+
def get_command_url(self, player_or_queue_id: str, command: str) -> str:
"""Get the url for the special command stream."""
return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
def get_plugin_source_url(
self,
- provider: str,
- source_id: str,
+ plugin_source: str,
player_id: str,
output_codec: ContentType = ContentType.FLAC,
) -> str:
# handle raw pcm without exact format specifiers
if output_codec.is_pcm() and ";" not in fmt:
fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
- return f"{self._server.base_url}/pluginsource/{provider}/{source_id}/{player_id}.{fmt}"
+ return f"{self._server.base_url}/pluginsource/{plugin_source}/{player_id}.{fmt}"
- async def get_flow_stream(
+ async def get_queue_flow_stream(
self,
queue: PlayerQueue,
start_queue_item: QueueItem,
bytes_written = 0
buffer = b""
# handle incoming audio chunks
- async for chunk in self.get_media_stream(
- queue_track.streamdetails,
+ async for chunk in self.get_queue_item_stream(
+ queue_track,
pcm_format=pcm_format,
):
# buffer size needs to be big enough to include the crossfade part
):
yield chunk
- async def get_media_stream(
+ async def get_plugin_source_stream(
+ self,
+ plugin_source_id: str,
+ output_format: AudioFormat,
+ player_id: str,
+ player_filter_params: list[str] | None = None,
+ ) -> AsyncGenerator[bytes, None]:
+ """Get the special plugin source stream."""
+ provider: PluginProvider = self.mass.get_provider(plugin_source_id)
+ plugin_source = provider.get_source()
+ if plugin_source.in_use_by and plugin_source.in_use_by != player_id:
+ raise RuntimeError(
+ f"PluginSource plugin_source.name is already in use by {plugin_source.in_use_by}"
+ )
+
+ audio_input = (
+ provider.get_audio_stream(player_id)
+ if plugin_source.stream_type == StreamType.CUSTOM
+ else plugin_source.path
+ )
+ chunk_size = int(get_chunksize(output_format, 1) / 10)
+ try:
+ plugin_source.in_use_by = player_id
+ async for chunk in get_ffmpeg_stream(
+ audio_input=audio_input,
+ input_format=plugin_source.audio_format,
+ output_format=output_format,
+ chunk_size=chunk_size,
+ filter_params=player_filter_params,
+ extra_input_args=["-re"],
+ ):
+ yield chunk
+ finally:
+ plugin_source.in_use_by = None
+
+ async def get_queue_item_stream(
self,
- streamdetails: StreamDetails,
+ queue_item: QueueItem,
pcm_format: AudioFormat,
- ) -> AsyncGenerator[tuple[bool, bytes], None]:
- """Get the audio stream for the given streamdetails as raw pcm chunks."""
+ ) -> AsyncGenerator[bytes, None]:
+ """Get the audio stream for a single queue item as raw PCM audio."""
# collect all arguments for ffmpeg
+ streamdetails = queue_item.streamdetails
+ assert streamdetails
filter_params = []
extra_input_args = streamdetails.extra_input_args or []
# handle volume normalization
collect_log_history=True,
loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
)
+
try:
await ffmpeg_proc.start()
logger.debug(
VolumeNormalizationMode.FIXED_GAIN,
)
and (finished or (seconds_streamed >= 30))
- and streamdetails.media_type != MediaType.PLUGIN_SOURCE
):
# dynamic mode not allowed and no measurement known, we need to analyze the audio
# add background task to start analyzing the audio
from music_assistant.constants import VERBOSE_LOG_LEVEL
from .process import AsyncProcess, check_output
-from .util import TimedAsyncGenerator, close_async_generator
+from .util import close_async_generator
if TYPE_CHECKING:
from music_assistant_models.media_items import AudioFormat
# for data to arrive (e.g. when there is X amount of seconds in the buffer)
# so this timeout is just to catch if the source is stuck and rpeort it and not
# to recover from it.
- async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300):
+ # async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300):
+ # if self.closed:
+ # return
+ # await self.write(chunk)
+ async for chunk in self.audio_input:
if self.closed:
return
await self.write(chunk)
"-ac",
str(output_format.channels),
]
+ if not output_format.content_type.is_pcm() and output_format.content_type.is_lossless():
+ output_args += ["-sample_fmt", f"s{output_format.bit_depth}"]
if output_format.output_format_str == "flac":
# use level 0 compression for fastest encoding
output_args += ["-compression_level", "0"]
"Please install ffmpeg on your OS to enable playback."
)
if returncode != 0:
- raise AudioError(
- "Error determining FFmpeg version on your system."
- "Your CPU may be too old to run this version of FFmpeg."
- f"Additional info: {returncode} {output.decode().strip()}"
- )
+ err_msg = "Error determining FFmpeg version on your system."
+ if returncode < 0:
+ # error below 0 is often illegal instruction
+ err_msg += " - Your CPU may be too old to run this version of FFmpeg."
+ err_msg += f" - Additional info: {returncode} {output.decode().strip()}"
+ raise AudioError(err_msg)
# parse version number from output
try:
version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0]
listdir = wrap(os.listdir)
rename = wrap(os.rename)
-EventCallBackType = Callable[[MassEvent], None] | Coroutine[MassEvent, Any, None]
+EventCallBackType = Callable[[MassEvent], None] | Callable[[MassEvent], Coroutine[Any, Any, None]]
EventSubscriptionType = tuple[
EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None
]
continue
if asyncio.iscoroutinefunction(cb_func):
if TYPE_CHECKING:
- cb_func = cast(Coroutine[Any, Any, None], cb_func)
+ cb_func = cast(Callable[[MassEvent], Coroutine[Any, Any, None]], cb_func)
self.create_task(cb_func, event_obj)
else:
if TYPE_CHECKING:
def create_task(
self,
- target: Coroutine[Any, Any, _R] | Awaitable[_R],
+ target: Callable[[MassEvent], Coroutine[Any, Any, None]] | Awaitable[_R],
*args: Any,
task_id: str | None = None,
abort_existing: bool = False,
from __future__ import annotations
from collections.abc import AsyncGenerator
-from typing import TYPE_CHECKING
+from dataclasses import dataclass, field
-from .provider import Provider
+from mashumaro import field_options, pass_through
+from music_assistant_models.enums import StreamType
+from music_assistant_models.player import PlayerMedia, PlayerSource
+from music_assistant_models.streamdetails import AudioFormat # noqa: TC002
-if TYPE_CHECKING:
- from music_assistant_models.enums import MediaType
- from music_assistant_models.media_items import PluginSource
- from music_assistant_models.streamdetails import StreamDetails
+from .provider import Provider
# ruff: noqa: ARG001, ARG002
+@dataclass()
+class PluginSource(PlayerSource):
+ """
+ Model for a PluginSource, which is a player (audio)source provided by a plugin.
+
+ This (intermediate) model is not exposed on the api,
+ but is used internally by the plugin provider.
+ """
+
+ # The output format that is sent to the player
+ # (or to the library/application that is used to send audio to the player)
+ audio_format: AudioFormat | None = field(
+ default=None,
+ compare=False,
+ metadata=field_options(serialize="omit", deserialize=pass_through),
+ repr=False,
+ )
+
+ # use this if the plugin can only provide a source to a single player at a time
+ in_use_by: str | None = field(
+ default=None,
+ compare=False,
+ metadata=field_options(serialize="omit", deserialize=pass_through),
+ repr=False,
+ )
+
+ # metadata of the current playing media (if known)
+ metadata: PlayerMedia | None = field(
+ default=None,
+ compare=False,
+ metadata=field_options(serialize="omit", deserialize=pass_through),
+ repr=False,
+ )
+
+ # The type of stream that is provided by this source
+ stream_type: StreamType | None = field(
+ default=StreamType.CUSTOM,
+ compare=False,
+ metadata=field_options(serialize="omit", deserialize=pass_through),
+ repr=False,
+ )
+
+ # The path to the source/audio (if streamtype is not custom)
+ path: str | None = field(
+ default=None,
+ compare=False,
+ metadata=field_options(serialize="omit", deserialize=pass_through),
+ repr=False,
+ )
+
+
class PluginProvider(Provider):
"""
Base representation of a Plugin for Music Assistant.
Plugin Provider implementations should inherit from this base model.
"""
- async def get_sources(self) -> list[PluginSource]: # type: ignore[return]
- """Get all audio sources provided by this provider."""
- # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
- raise NotImplementedError
-
- async def get_source(self, prov_source_id: str) -> PluginSource: # type: ignore[return]
- """Get AudioSource details by id."""
+ def get_source(self) -> PluginSource: # type: ignore[return]
+ """Get (audio)source details for this plugin."""
# Will only be called if ProviderFeature.AUDIO_SOURCE is declared
raise NotImplementedError
- async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
- """Return the streamdetails to stream an (audio)source provided by this plugin."""
- # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
- raise NotImplementedError
-
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
+ async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]:
"""
- Return the (custom) audio stream for the provider item.
+ Return the (custom) audio stream for the audio source provided by this plugin.
+
+ Will only be called if this plugin is a PluginSource, meaning that
+ the ProviderFeature.AUDIO_SOURCE is declared AND if the streamtype is StreamType.CUSTOM.
- Will only be called when the stream_type is set to CUSTOM.
+ The player_id is the id of the player that is requesting the stream.
"""
if False:
yield b""
raise NotImplementedError
-
- async def on_streamed(
- self,
- streamdetails: StreamDetails,
- ) -> None:
- """
- Handle callback when given streamdetails completed streaming.
-
- To get the number of seconds streamed, see streamdetails.seconds_streamed.
- To get the number of seconds seeked/skipped, see streamdetails.seek_position.
- Note that seconds_streamed is the total streamed seconds, so without seeked time.
-
- NOTE: Due to internal and player buffering,
- this may be called in advance of the actual completion.
- """
"""Return the features supported by this Provider."""
# MANDATORY
# you should return a set of provider-level features
- # here that your player provider supports or an empty tuple if none.
+ # here that your player provider supports or an empty set if none.
# for example 'ProviderFeature.SYNC_PLAYERS' if you can sync players.
return {ProviderFeature.SYNC_PLAYERS}
from typing import TYPE_CHECKING
from music_assistant_models.enums import ContentType, EventType, ProviderFeature
-from music_assistant_models.streamdetails import AudioFormat
+from music_assistant_models.media_items.audio_format import AudioFormat
from music_assistant.models.plugin import PluginProvider, PluginSource
output_format=AIRPLAY_PCM_FORMAT,
use_pre_announce=media.custom_data["use_pre_announce"],
)
+ elif media.media_type == MediaType.PLUGIN_SOURCE:
+ # special case: plugin source stream
+ input_format = AIRPLAY_PCM_FORMAT
+ assert media.custom_data is not None # for type checking
+ audio_source = self.mass.streams.get_plugin_source_stream(
+ plugin_source_id=media.custom_data["provider"],
+ output_format=AIRPLAY_PCM_FORMAT,
+ player_id=player_id,
+ )
elif media.queue_id and media.queue_id.startswith("ugp_"):
# special case: UGP stream
ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.base_pcm_format
audio_source = ugp_stream.subscribe_raw()
+ elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
+ # use single item stream request for radio streams
+ input_format = AIRPLAY_PCM_FORMAT
+ queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
+ assert queue_item is not None # for type checking
+ audio_source = self.mass.streams.get_queue_item_stream(
+ queue_item=queue_item,
+ pcm_format=AIRPLAY_PCM_FORMAT,
+ )
elif media.queue_id and media.queue_item_id:
# regular queue (flow) stream request
input_format = AIRPLAY_FLOW_PCM_FORMAT
assert queue
start_queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
assert start_queue_item
- audio_source = self.mass.streams.get_flow_stream(
+ audio_source = self.mass.streams.get_queue_flow_stream(
queue=queue,
start_queue_item=start_queue_item,
pcm_format=input_format,
# device switched to another source (or is powered off)
if raop_stream := airplay_player.raop_stream:
# ignore this if we just started playing to prevent false positives
- assert mass_player.elapsed_time
- if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
+ elapsed_time = (
+ 10 if mass_player.elapsed_time is None else mass_player.elapsed_time
+ )
+ if elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
raop_stream.prevent_playback = True
self.mass.create_task(self.monitor_prevent_playback(player_id))
elif "device-prevent-playback=0" in path:
title = queue.current_item.name
artist = ""
album = ""
- if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_metadata:
+ if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
# stream title/metadata from radio/live stream
- title = queue.current_item.streamdetails.stream_metadata.title or ""
- artist = queue.current_item.streamdetails.stream_metadata.artist or ""
+ if " - " in queue.current_item.streamdetails.stream_title:
+ artist, title = queue.current_item.streamdetails.stream_title.split(" - ", 1)
+ else:
+ title = queue.current_item.streamdetails.stream_title
+ artist = ""
# set album to radio station name
album = queue.current_item.name
elif media_item := queue.current_item.media_item:
all_mute_entities: list[ConfigValueOption] = []
all_volume_entities: list[ConfigValueOption] = []
# collect all entities that are usable for player controls
+ if not hass.connected:
+ return ()
for state in await hass.get_states():
if "friendly_name" not in state["attributes"]:
# filter out invalid/unavailable players
output_format=UGP_FORMAT,
use_pre_announce=media.custom_data["use_pre_announce"],
)
+ elif media.media_type == MediaType.PLUGIN_SOURCE:
+ # special case: plugin source stream
+ audio_source = self.mass.streams.get_plugin_source_stream(
+ plugin_source_id=media.custom_data["provider"],
+ output_format=UGP_FORMAT,
+ player_id=player_id,
+ )
+ elif media.media_type == MediaType.RADIO:
+ # use single item stream request for radio streams
+ audio_source = self.mass.streams.get_queue_item_stream(
+ queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id),
+ pcm_format=UGP_FORMAT,
+ )
elif media.queue_id and media.queue_item_id:
# regular queue stream request
- audio_source = self.mass.streams.get_flow_stream(
+ audio_source = self.mass.streams.get_queue_flow_stream(
queue=self.mass.player_queues.get(media.queue_id),
start_queue_item=self.mass.player_queues.get_item(
media.queue_id, media.queue_item_id
ProviderMapping,
Radio,
)
-from music_assistant_models.streamdetails import LivestreamMetadata, StreamDetails
+from music_assistant_models.streamdetails import StreamDetails
from tenacity import RetryError
from music_assistant.helpers.util import select_free_port
if latest_cut_marker:
latest_cut = latest_cut_marker.cut
title = latest_cut.title
- self._current_stream_details.stream_metadata = LivestreamMetadata(
- title=title,
- artist=", ".join([a.name for a in latest_cut.artists]),
- )
+ artist = ", ".join([a.name for a in latest_cut.artists])
+ self._current_stream_details.stream_title = f"{artist} - {title}"
async def _refresh_channels(self) -> bool:
self._channels = await self._client.channels
output_format=master_audio_format,
use_pre_announce=media.custom_data["use_pre_announce"],
)
+ elif media.media_type == MediaType.PLUGIN_SOURCE:
+ # special case: plugin source stream
+ audio_source = self.mass.streams.get_plugin_source_stream(
+ plugin_source_id=media.custom_data["provider"],
+ output_format=master_audio_format,
+ player_id=player_id,
+ )
elif media.queue_id.startswith("ugp_"):
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
# Filter is later applied in MultiClientStream
audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
+ elif media.media_type == MediaType.RADIO:
+ # use single item stream request for radio streams
+ audio_source = self.mass.streams.get_queue_item_stream(
+ queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id),
+ pcm_format=master_audio_format,
+ )
elif media.queue_id and media.queue_item_id:
# regular queue stream request
- audio_source = self.mass.streams.get_flow_stream(
+ audio_source = self.mass.streams.get_queue_flow_stream(
queue=self.mass.player_queues.get(media.queue_id),
start_queue_item=self.mass.player_queues.get_item(
media.queue_id, media.queue_item_id
self.mass.players.update(player_id, skip_forward=True)
self.mass.players.update(mass_player.synced_to, skip_forward=True)
- async def play_media(self, player_id: str, media: PlayerMedia) -> None:
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None: # noqa: PLR0915
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
if player.synced_to:
output_format=DEFAULT_SNAPCAST_FORMAT,
use_pre_announce=media.custom_data["use_pre_announce"],
)
+ elif media.media_type == MediaType.PLUGIN_SOURCE:
+ # special case: plugin source stream
+ # consume the stream directly, so we can skip one step in between
+ # provider: PluginProvider = self.mass.get_provider(media.custom_data["provider"])
+ # plugin_source = provider.get_source()
+ # audio_source = (
+ # provider.get_audio_stream(player_id)
+ # if plugin_source.stream_type == StreamType.CUSTOM
+ # else plugin_source.path
+ # )
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ audio_source = self.mass.streams.get_plugin_source_stream(
+ plugin_source_id=media.custom_data["provider"],
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ player_id=player_id,
+ )
elif media.queue_id.startswith("ugp_"):
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.base_pcm_format
audio_source = ugp_stream.subscribe_raw()
+ elif media.media_type == MediaType.RADIO:
+ # use single item stream request for radio streams
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ audio_source = self.mass.streams.get_queue_item_stream(
+ queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id),
+ pcm_format=DEFAULT_SNAPCAST_FORMAT,
+ )
elif media.queue_id and media.queue_item_id:
# regular queue (flow) stream request
input_format = DEFAULT_SNAPCAST_PCM_FORMAT
- audio_source = self.mass.streams.get_flow_stream(
+ audio_source = self.mass.streams.get_queue_flow_stream(
queue=self.mass.player_queues.get(media.queue_id),
start_queue_item=self.mass.player_queues.get_item(
media.queue_id, media.queue_item_id
),
SOURCE_AIRPLAY: PlayerSource(
id=SOURCE_AIRPLAY,
- name="Spotify",
+ name="Airplay",
passive=True,
can_play_pause=True,
can_next_previous=True,
),
SOURCE_RADIO: PlayerSource(
id=SOURCE_RADIO,
- name="Spotify",
+ name="Radio",
passive=True,
can_play_pause=True,
can_next_previous=True,
# play a single uri/url
# note that this most probably will only work for (long running) radio streams
- if self.mass.config.get_raw_player_config_value(
- player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value
- ):
- media.uri = media.uri.replace(".flac", ".mp3")
+ # enforce mp3 here because Sonos really does not support FLAC streams without duration
+ media.uri = media.uri.replace(".flac", ".mp3")
await sonos_player.client.player.group.play_stream_url(
media.uri, {"name": media.title, "type": "track"}
)
EventType,
MediaType,
ProviderFeature,
- QueueOption,
StreamType,
)
-from music_assistant_models.errors import MediaNotFoundError
-from music_assistant_models.media_items import AudioFormat, PluginSource, ProviderMapping
-from music_assistant_models.streamdetails import LivestreamMetadata, StreamDetails
+from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.player import PlayerMedia
from music_assistant.constants import CONF_ENTRY_WARN_PREVIEW
-from music_assistant.helpers.audio import get_chunksize
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
-from music_assistant.helpers.process import AsyncProcess
-from music_assistant.models.music_provider import MusicProvider
+from music_assistant.helpers.process import AsyncProcess, check_output
+from music_assistant.models.plugin import PluginProvider, PluginSource
from music_assistant.providers.spotify.helpers import get_librespot_binary
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator
-
from aiohttp.web import Request
from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
from music_assistant_models.event import MassEvent
from music_assistant.models import ProviderInstanceType
CONF_MASS_PLAYER_ID = "mass_player_id"
-CONF_CUSTOM_NAME = "custom_name"
CONF_HANDOFF_MODE = "handoff_mode"
CONNECT_ITEM_ID = "spotify_connect"
),
required=True,
),
- ConfigEntry(
- key=CONF_CUSTOM_NAME,
- type=ConfigEntryType.STRING,
- label="Name for the Spotify Connect Player",
- default_value="",
- description="Select what name should be shown in the Spotify app as speaker name. "
- "Leave blank to use the Music Assistant player's name",
- required=False,
- ),
# ConfigEntry(
# key=CONF_HANDOFF_MODE,
# type=ConfigEntryType.BOOLEAN,
# "When enabling handoff mode, the Spotify Connect plugin will instead "
# "forward the Spotify playback request to the Music Assistant Queue, so basically "
# "the spotify app can be used to initiate playback, but then MA will take over "
- # "the playback and manage the queue, the normal operating mode of MA. \n\n"
+ # "the playback and manage the queue, which is the normal operating mode of MA. \n\n"
# "This mode however means that the Spotify app will not report the actual playback ",
# required=False,
# ),
)
-class SpotifyConnectProvider(MusicProvider):
+class SpotifyConnectProvider(PluginProvider):
"""Implementation of a Spotify Connect Plugin."""
def __init__(
self._runner_task: asyncio.Task | None = None # type: ignore[type-arg]
self._librespot_proc: AsyncProcess | None = None
self._librespot_started = asyncio.Event()
- self._player_connected: bool = False
- self._current_streamdetails: StreamDetails | None = None
- self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(60)
+ self.named_pipe = f"/tmp/{self.instance_id}" # noqa: S108
+ self._source_details = PluginSource(
+ id=self.lookup_key,
+ name=self.manifest.name,
+ # we set passive to true because we
+ # dont allow this source to be selected directly
+ passive=True,
+ # TODO: implement controlling spotify from MA itself
+ can_play_pause=False,
+ can_seek=False,
+ can_next_previous=False,
+ audio_format=AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ codec_type=ContentType.PCM_S16LE,
+ sample_rate=44100,
+ bit_depth=16,
+ channels=2,
+ ),
+ metadata=PlayerMedia(
+ "Spotify Connect",
+ ),
+ stream_type=StreamType.NAMED_PIPE,
+ path=self.named_pipe,
+ )
+ self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
self._on_unload_callbacks: list[Callable[..., None]] = [
self.mass.subscribe(
self._on_mass_player_event,
"""Return the features supported by this Provider."""
return {ProviderFeature.AUDIO_SOURCE}
- @property
- def name(self) -> str:
- """Return (custom) friendly name for this provider instance."""
- if custom_name := cast(str, self.config.get_value(CONF_CUSTOM_NAME)):
- return f"{self.manifest.name}: {custom_name}"
- if player := self.mass.players.get(self.mass_player_id):
- return f"{self.manifest.name}: {player.display_name}"
- return super().name
-
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._librespot_bin = await get_librespot_binary()
for callback in self._on_unload_callbacks:
callback()
- async def get_sources(self) -> list[PluginSource]:
- """Get all audio sources provided by this provider."""
- # we only have passive/hidden sources so no need to supply this listing
- return []
-
- async def get_source(self, prov_source_id: str) -> PluginSource:
- """Get AudioSource details by id."""
- if prov_source_id != CONNECT_ITEM_ID:
- raise MediaNotFoundError(f"Invalid source id: {prov_source_id}")
- return PluginSource(
- item_id=CONNECT_ITEM_ID,
- provider=self.instance_id,
- name="Spotify Connect",
- provider_mappings={
- ProviderMapping(
- item_id=CONNECT_ITEM_ID,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(content_type=ContentType.OGG),
- )
- },
- )
-
- async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
- """Return the streamdetails to stream an audiosource provided by this plugin."""
- self._current_streamdetails = streamdetails = StreamDetails(
- item_id=CONNECT_ITEM_ID,
- provider=self.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.OGG,
- ),
- media_type=MediaType.PLUGIN_SOURCE,
- allow_seek=False,
- can_seek=False,
- stream_type=StreamType.CUSTOM,
- )
- return streamdetails
-
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- if not self._librespot_proc or self._librespot_proc.closed:
- raise MediaNotFoundError(f"Librespot not ready for: {streamdetails.item_id}")
- self._player_connected = True
- try:
- while True:
- yield await self._audio_buffer.get()
- finally:
- self._player_connected = False
- await asyncio.sleep(2)
- if not self._player_connected:
- # handle situation where the stream is disconnected from the MA player
- # easiest way to unmark this librespot instance as active player is to close it
- await self._librespot_proc.close(True)
+ def get_source(self) -> PluginSource:
+ """Get (audio)source details for this plugin."""
+ return self._source_details
async def _librespot_runner(self) -> None:
"""Run the spotify connect daemon in a background task."""
assert self._librespot_bin
- if not (player := self.mass.players.get(self.mass_player_id)):
- raise MediaNotFoundError(f"Player not found: {self.mass_player_id}")
- name = cast(str, self.config.get_value(CONF_CUSTOM_NAME) or player.display_name)
- self.logger.info("Starting Spotify Connect background daemon %s", name)
+ self.logger.info("Starting Spotify Connect background daemon")
os.environ["MASS_CALLBACK"] = f"{self.mass.streams.base_url}/{self.instance_id}"
+ await check_output("rm", "-f", self.named_pipe)
+ await asyncio.sleep(0.5)
+ await check_output("mkfifo", self.named_pipe)
+ await asyncio.sleep(0.5)
try:
args: list[str] = [
self._librespot_bin,
"--name",
- name,
+ self.name,
"--cache",
self.cache_dir,
"--disable-audio-cache",
"320",
"--backend",
"pipe",
+ "--device",
+ self.named_pipe,
"--dither",
"none",
- "--passthrough",
# disable volume control
"--mixer",
"softvol",
"fixed",
"--initial-volume",
"100",
+ "--enable-volume-normalisation",
# forward events to the events script
"--onevent",
str(EVENTS_SCRIPT),
"--emit-sink-events",
]
self._librespot_proc = librespot = AsyncProcess(
- args, stdout=True, stderr=True, name=f"librespot[{name}]"
+ args, stdout=False, stderr=True, name=f"librespot[{self.name}]"
)
await librespot.start()
# keep reading logging from stderr until exit
- async def log_reader() -> None:
- async for line in librespot.iter_stderr():
- if (
- not self._librespot_started.is_set()
- and "Using StdoutSink (pipe) with format: S16" in line
- ):
- self._librespot_started.set()
- if "error sending packet Os" in line:
- continue
- if "dropping truncated packet" in line:
- continue
- if "couldn't parse packet from " in line:
- continue
- self.logger.debug(line)
-
- async def audio_reader() -> None:
- chunksize = get_chunksize(AudioFormat(content_type=ContentType.OGG))
- async for chunk in get_ffmpeg_stream(
- librespot.iter_chunked(chunksize),
- input_format=AudioFormat(content_type=ContentType.OGG),
- output_format=AudioFormat(content_type=ContentType.OGG),
- extra_input_args=["-readrate", "1.0"],
+ async for line in librespot.iter_stderr():
+ if (
+ not self._librespot_started.is_set()
+ and "Using StdoutSink (pipe) with format: S16" in line
):
- if librespot.closed or self._stop_called:
- break
- if not self._player_connected:
- continue
- await self._audio_buffer.put(chunk)
+ self._librespot_started.set()
+ if "error sending packet Os" in line:
+ continue
+ if "dropping truncated packet" in line:
+ continue
+ if "couldn't parse packet from " in line:
+ continue
+ self.logger.debug(line)
- await asyncio.gather(log_reader(), audio_reader())
except asyncio.CancelledError:
await librespot.close(True)
finally:
- self.logger.info("Spotify Connect background daemon stopped for %s", name)
+ self.logger.info("Spotify Connect background daemon stopped for %s", self.name)
+ await check_output("rm", "-f", self.named_pipe)
# auto restart if not stopped manually
if not self._stop_called and self._librespot_started.is_set():
self._setup_player_daemon()
# handle session connected event
# this player has become the active spotify connect player
# we need to start the playback
- if not self._player_connected and json_data.get("event") in (
- "session_connected",
- "play_request_id_changed",
+ self.logger.error("%s - %s", self.name, json_data.get("event"))
+ if not self._source_details.in_use_by and json_data.get("event") in (
+ # "session_connected",
+ # "loading",
+ "sink",
):
- # initiate playback by selecting the pluginsource mediaitem on the player
- pluginsource_item = await self.get_source(CONNECT_ITEM_ID)
+ # initiate playback by selecting this source on the default player
self.mass.create_task(
- self.mass.player_queues.play_media(
- queue_id=self.mass_player_id,
- media=pluginsource_item,
- option=QueueOption.REPLACE,
- )
+ self.mass.players.select_source(self.mass_player_id, self.lookup_key)
)
- if self._current_streamdetails:
- # parse metadata fields
- if "common_metadata_fields" in json_data:
- title = json_data["common_metadata_fields"].get("name", "Unknown")
- if artists := json_data.get("track_metadata_fields", {}).get("artists"):
- artist = artists[0]
- else:
- artist = "Unknown"
- if images := json_data["common_metadata_fields"].get("covers"):
- image_url = images[0]
- else:
- image_url = None
- self._current_streamdetails.stream_metadata = LivestreamMetadata(
- title=title, artist=artist, image_url=image_url
- )
+ # parse metadata fields
+ if "common_metadata_fields" in json_data:
+ uri = json_data["common_metadata_fields"].get("uri", "Unknown")
+ title = json_data["common_metadata_fields"].get("name", "Unknown")
+ if artists := json_data.get("track_metadata_fields", {}).get("artists"):
+ artist = artists[0]
+ else:
+ artist = "Unknown"
+ album = json_data["common_metadata_fields"].get("album", "Unknown")
+ if images := json_data["common_metadata_fields"].get("covers"):
+ image_url = images[0]
+ else:
+ image_url = None
+ if self._source_details.metadata is None:
+ self._source_details.metadata = PlayerMedia(uri, media_type=MediaType.TRACK)
+ self._source_details.metadata.uri = uri
+ self._source_details.metadata.title = title
+ self._source_details.metadata.artist = artist
+ self._source_details.metadata.album = album
+ self._source_details.metadata.image_url = image_url
return Response()