) -> None:
"""Handle playback/select of given plugin source on player."""
plugin_source = plugin_prov.get_source()
- if plugin_prov.in_use_by and plugin_prov.in_use_by != player.player_id:
- raise PlayerCommandFailed(
- f"Source {plugin_source.name} is already in use by another player"
- )
player.active_source = plugin_source.id
stream_url = self.mass.streams.get_plugin_source_url(plugin_source.id, player.player_id)
await self.play_media(
for plugin_prov in self.mass.get_providers(ProviderType.PLUGIN):
if ProviderFeature.AUDIO_SOURCE not in plugin_prov.supported_features:
continue
- if plugin_prov.in_use_by and plugin_prov.in_use_by != player.player_id:
- continue
plugin_source = plugin_prov.get_source()
+ if plugin_source.in_use_by and plugin_source.in_use_by != player.player_id:
+ continue
if plugin_source.id in player_source_ids:
continue
player.source_list.append(plugin_source)
from __future__ import annotations
+import asyncio
import os
import urllib.parse
from collections.abc import AsyncGenerator
"Accept-Ranges": "none",
"Content-Type": f"audio/{output_format.output_format_str}",
}
+
resp = web.StreamResponse(
status=200,
reason="OK",
return resp
# all checks passed, start streaming!
- self.logger.debug(
- "Start serving audio stream for PluginSource %s (%s) to %s",
- plugin_source.name,
- plugin_source.id,
- player.display_name,
- )
async for chunk in self.get_plugin_source_stream(
plugin_source_id=plugin_source_id,
output_format=output_format,
player = self.mass.players.get(player_id)
plugin_prov: PluginProvider = self.mass.get_provider(plugin_source_id)
plugin_source = plugin_prov.get_source()
- if plugin_prov.in_use_by and plugin_prov.in_use_by != player_id:
+ if plugin_source.in_use_by and plugin_source.in_use_by != player_id:
raise RuntimeError(
- f"PluginSource plugin_source.name is already in use by {plugin_prov.in_use_by}"
+ f"PluginSource plugin_source.name is already in use by {plugin_source.in_use_by}"
)
+ self.logger.debug("Start streaming PluginSource %s to %s", plugin_source_id, player_id)
audio_input = (
plugin_prov.get_audio_stream(player_id)
if plugin_source.stream_type == StreamType.CUSTOM
else plugin_source.path
)
- chunk_size = int(get_chunksize(output_format, 1) / 10)
player.active_source = plugin_source_id
+ plugin_source.in_use_by = player_id
try:
async for chunk in get_ffmpeg_stream(
audio_input=audio_input,
input_format=plugin_source.audio_format,
output_format=output_format,
- chunk_size=chunk_size,
filter_params=player_filter_params,
extra_input_args=["-re"],
):
yield chunk
finally:
+ self.logger.debug(
+ "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
+ )
+ await asyncio.sleep(0.5)
player.active_source = player.player_id
+ plugin_source.in_use_by = None
async def get_queue_item_stream(
self,
"-ignore_unknown",
"-protocol_whitelist",
"file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp,concat",
+ "-probesize",
+ "8192",
]
# collect input args
input_args = []
+
if extra_input_args:
input_args += extra_input_args
if input_path.startswith("http"):
async def start(self) -> None:
"""Perform Async init of process."""
- for attempt in range(2):
- try:
- self.proc = await asyncio.create_subprocess_exec(
- *self._args,
- stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
- stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
- stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
- # because we're exchanging big amounts of (audio) data with pipes
- # it makes sense to extend the pipe size and (buffer) limits a bit
- limit=1000000 if attempt == 0 else 65536,
- pipesize=1000000 if attempt == 0 else -1,
- )
- break
- except PermissionError:
- if attempt > 0:
- raise
- LOGGER.error(
- "Detected that you are running the (docker) container without "
- "permissive access rights. This will impact performance !"
- )
-
+ self.proc = await asyncio.create_subprocess_exec(
+ *self._args,
+ stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
+ stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
+ stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
+ )
self.logger.log(
VERBOSE_LOG_LEVEL, "Process %s started with PID %s", self.name, self.proc.pid
)
metadata=field_options(serialize="omit", deserialize=pass_through),
repr=False,
)
+ # in_use_by specifies the player id that is currently using this plugin (if any)
+ in_use_by: str | None = field(
+ default=None,
+ compare=False,
+ metadata=field_options(serialize="omit", deserialize=pass_through),
+ repr=False,
+ )
class PluginProvider(Provider):
Plugin Provider implementations should inherit from this base model.
"""
- @property
- def in_use_by(self) -> str | None:
- """Return player id that is currently using this plugin (if any)."""
- for player in self.mass.players:
- if player.active_source == self.lookup_key:
- return player.player_id
- return None
-
def get_source(self) -> PluginSource: # type: ignore[return]
"""Get (audio)source details for this plugin."""
# Will only be called if ProviderFeature.AUDIO_SOURCE is declared
# handle session connected event
# this player has become the active spotify connect player
# we need to start the playback
- if json_data.get("event") in ("sink",) and (
- not self.in_use_by
- or ((player := self.mass.players.get(self.in_use_by)) and player.state == "idle")
- ):
+ if json_data.get("event") in ("sink", "playing") and (not self._source_details.in_use_by):
# initiate playback by selecting this source on the default player
+ self.logger.error("Initiating playback on %s", self.mass_player_id)
self.mass.create_task(
self.mass.players.select_source(self.mass_player_id, self.lookup_key)
)
+ self._source_details.in_use_by = self.mass_player_id
# parse metadata fields
if "common_metadata_fields" in json_data: