from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
from music_assistant.helpers.webserver import Webserver
from music_assistant.models.core_controller import CoreController
-from music_assistant.models.plugin import PluginProvider
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig
"This is an advanced setting that should normally "
"not be adjusted in regular setups.",
category="advanced",
+ required=False,
),
ConfigEntry(
key=CONF_BIND_IP,
"This is an advanced setting that should normally "
"not be adjusted in regular setups.",
category="advanced",
+ required=False,
),
)
"/announcement/{player_id}.{fmt}",
self.serve_announcement_stream,
),
- (
- "*",
- "/pluginsource/{provider_id}/{source_id}/{player_id}.{fmt}",
- self.serve_plugin_source_stream,
- ),
],
)
return resp
- async def serve_plugin_source_stream(self, request: web.Request) -> web.Response:
- """Stream PluginSource audio to a player."""
- self._log_request(request)
- provider_id = request.match_info["provider_id"]
- provider: PluginProvider | None
- if not (provider := self.mass.get_provider(provider_id)):
- raise web.HTTPNotFound(reason=f"Unknown Provider: {provider_id}")
- source_id = request.match_info["source_id"]
- if not (source := await provider.get_source(source_id)):
- raise web.HTTPNotFound(reason=f"Unknown PluginSource: {source_id}")
- try:
- streamdetails = await provider.get_stream_details(source_id, "plugin_source")
- except Exception:
- err_msg = f"No streamdetails for PluginSource: {source_id}"
- self.logger.error(err_msg)
- raise web.HTTPNotFound(reason=err_msg)
-
- # work out output format/details
- player_id = request.match_info["player_id"]
- player = self.mass.players.get(player_id)
- if not player:
- raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
- output_format = await self._get_output_format(
- output_format_str=request.match_info["fmt"],
- player=player,
- default_sample_rate=streamdetails.audio_format.sample_rate,
- default_bit_depth=streamdetails.audio_format.bit_depth,
- )
-
- # prepare request, add some DLNA/UPNP compatible headers
- headers = {
- **DEFAULT_STREAM_HEADERS,
- "icy-name": source.name,
- }
- resp = web.StreamResponse(
- status=200,
- reason="OK",
- headers=headers,
- )
- resp.content_type = f"audio/{output_format.output_format_str}"
- http_profile: str = await self.mass.config.get_player_config_value(
- player_id, CONF_HTTP_PROFILE
- )
- if http_profile == "forced_content_length" and streamdetails.duration:
- # guess content length based on duration
- resp.content_length = get_chunksize(output_format, streamdetails.duration)
- elif http_profile == "chunked":
- resp.enable_chunked_encoding()
-
- await resp.prepare(request)
-
- # return early if this is not a GET request
- if request.method != "GET":
- return resp
-
- # all checks passed, start streaming!
- self.logger.debug(
- "Start serving audio stream for PluginSource %s (%s) to %s",
- source.name,
- source.id,
- player.display_name,
- )
- async for chunk in self.get_plugin_source_stream(
- streamdetails,
- output_format=output_format,
- ):
- try:
- await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError, ConnectionError):
- break
- if streamdetails.stream_error:
- self.logger.error(
- "Error streaming PluginSource %s (%s) to %s",
- source.name,
- source.uri,
- player.display_name,
- )
- return resp
-
def get_command_url(self, player_or_queue_id: str, command: str) -> str:
"""Get the url for the special command stream."""
return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
):
yield chunk
- async def get_plugin_source_stream(
- self,
- streamdetails: StreamDetails,
- output_format: AudioFormat,
- ) -> AsyncGenerator[tuple[bool, bytes], None]:
- """Get the audio stream for a PluginSource."""
- streamdetails.seek_position = 0
- extra_input_args = ["-re"]
- # work out audio source for these streamdetails
- if streamdetails.stream_type == StreamType.CUSTOM:
- audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
- streamdetails,
- seek_position=streamdetails.seek_position,
- )
- elif streamdetails.stream_type == StreamType.HLS:
- substream = await get_hls_substream(self.mass, streamdetails.path)
- audio_source = substream.path
- else:
- audio_source = streamdetails.path
-
- # add support for decryption key provided in streamdetails
- if streamdetails.decryption_key:
- extra_input_args += ["-decryption_key", streamdetails.decryption_key]
-
- async for chunk in get_ffmpeg_stream(
- audio_input=audio_source,
- input_format=streamdetails.audio_format,
- output_format=output_format,
- extra_input_args=extra_input_args,
- ):
- yield chunk
-
def _log_request(self, request: web.Request) -> None:
"""Log request."""
if not self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
self._librespot_started = asyncio.Event()
self._player_connected: bool = False
self._current_streamdetails: StreamDetails | None = None
+ self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(60)
self._on_unload_callbacks: list[Callable[..., None]] = [
self.mass.subscribe(
self._on_mass_player_event,
item_id=CONNECT_ITEM_ID,
provider=self.instance_id,
audio_format=AudioFormat(
- content_type=ContentType.PCM_S16LE,
+ content_type=ContentType.OGG,
),
media_type=MediaType.PLUGIN_SOURCE,
allow_seek=False,
can_seek=False,
stream_type=StreamType.CUSTOM,
- extra_input_args=["-re"],
)
return streamdetails
if not self._librespot_proc or self._librespot_proc.closed:
raise MediaNotFoundError(f"Librespot not ready for: {streamdetails.item_id}")
self._player_connected = True
- chunksize = get_chunksize(streamdetails.audio_format)
- try:
- async for chunk in self._librespot_proc.iter_chunked(chunksize):
- if self._librespot_proc.closed or self._stop_called:
- break
- yield chunk
- finally:
- self._player_connected = False
- await asyncio.sleep(2)
- if not self._player_connected:
- # handle situation where the stream is disconnected from the MA player
- # easiest way to unmark this librespot instance as active player is to close it
- await self._librespot_proc.close(True)
+ while True:
+ yield await self._audio_buffer.get()
async def _librespot_runner(self) -> None:
"""Run the spotify connect daemon in a background task."""
"pipe",
"--dither",
"none",
+ "--passthrough",
# disable volume control
"--mixer",
"softvol",
args, stdout=True, stderr=True, name=f"librespot[{name}]"
)
await librespot.start()
+
# keep reading logging from stderr until exit
- async for line in librespot.iter_stderr():
- if (
- not self._librespot_started.is_set()
- and "Using StdoutSink (pipe) with format: S16" in line
- ):
- self._librespot_started.set()
- if "error sending packet Os" in line:
- continue
- if "dropping truncated packet" in line:
- continue
- if "couldn't parse packet from " in line:
- continue
- self.logger.debug(line)
+ async def log_reader() -> None:
+ async for line in librespot.iter_stderr():
+ if (
+ not self._librespot_started.is_set()
+ and "Using StdoutSink (pipe) with format: S16" in line
+ ):
+ self._librespot_started.set()
+ if "error sending packet Os" in line:
+ continue
+ if "dropping truncated packet" in line:
+ continue
+ if "couldn't parse packet from " in line:
+ continue
+ self.logger.debug(line)
+
+ async def audio_reader() -> None:
+ chunksize = get_chunksize(AudioFormat(content_type=ContentType.OGG))
+ async for chunk in librespot.iter_chunked(chunksize):
+ if librespot.closed or self._stop_called:
+ break
+ await self._audio_buffer.put(chunk)
+
+ await asyncio.gather(log_reader(), audio_reader())
except asyncio.CancelledError:
await librespot.close(True)
finally: