From 0eac0c0211ca7c9e2facab68f3c583357b445d23 Mon Sep 17 00:00:00 2001 From: OzGav Date: Tue, 9 Sep 2025 19:32:40 +1000 Subject: [PATCH] Add Atomic Port Validation and Cleanup to Squeezelite Provider (#2352) --- .../providers/squeezelite/provider.py | 125 ++++++++++++------ 1 file changed, 88 insertions(+), 37 deletions(-) diff --git a/music_assistant/providers/squeezelite/provider.py b/music_assistant/providers/squeezelite/provider.py index f6c8fef2..e0778490 100644 --- a/music_assistant/providers/squeezelite/provider.py +++ b/music_assistant/providers/squeezelite/provider.py @@ -23,14 +23,23 @@ from .player import SqueezelitePlayer if TYPE_CHECKING: from aioslimproto.client import SlimClient + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant import MusicAssistant class SqueezelitePlayerProvider(PlayerProvider): """Player provider for players using slimproto (like Squeezelite).""" - def __init__(self, *args, **kwargs) -> None: + def __init__( + self, + mass: MusicAssistant, + manifest: ProviderManifest, + config: ProviderConfig, + ) -> None: """Initialize the provider.""" - super().__init__(*args, **kwargs) + super().__init__(mass, manifest, config) self.slimproto: SlimServer | None = None self._players: dict[str, SqueezelitePlayer] = {} @@ -51,37 +60,75 @@ class SqueezelitePlayerProvider(PlayerProvider): logging.getLogger("aioslimproto").setLevel(logging.DEBUG) else: logging.getLogger("aioslimproto").setLevel(self.logger.level + 10) - # setup slimproto server - control_port = self.config.get_value(CONF_PORT) - if await is_port_in_use(control_port): - msg = f"Port {control_port} is not available" - raise SetupFailedError(msg) - telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT) - if telnet_port is not None and await is_port_in_use(telnet_port): - msg = f"Telnet port {telnet_port} is not available" - raise SetupFailedError(msg) - json_port = self.config.get_value(CONF_CLI_JSON_PORT) - if json_port is not None and await is_port_in_use(json_port): - msg = f"JSON port {json_port} is not available" + + # Get all port configurations + control_port = cast("int", self.config.get_value(CONF_PORT)) + telnet_port = cast("int | None", self.config.get_value(CONF_CLI_TELNET_PORT)) + json_port = cast("int | None", self.config.get_value(CONF_CLI_JSON_PORT)) + + # Validate ALL required ports before starting ANY services + await self._validate_all_ports(control_port, telnet_port, json_port) + + # Only proceed with server creation after all ports are validated + try: + self.slimproto = SlimServer( + cli_port=telnet_port or None, + cli_port_json=json_port or None, + ip_address=self.mass.streams.publish_ip, + name="Music Assistant", + control_port=control_port, + ) + # start slimproto socket server + await self.slimproto.start() + except Exception as err: + # Ensure cleanup on any initialization failure + await self._cleanup_server() + raise SetupFailedError(f"Failed to start SlimProto server: {err}") from err + + async def _validate_all_ports( + self, control_port: int, telnet_port: int | None, json_port: int | None + ) -> None: + """Validate that all required ports are available before starting any services.""" + ports_to_check = [(control_port, "SlimProto control")] + + if telnet_port and telnet_port > 0: + ports_to_check.append((telnet_port, "Telnet CLI")) + + if json_port and json_port > 0: + ports_to_check.append((json_port, "JSON-RPC CLI")) + + # Collect all port conflicts before raising any errors + occupied_ports = [] + for port, port_description in ports_to_check: + if await is_port_in_use(port): + occupied_ports.append(f"{port_description} port {port}") + + # If any ports are occupied, raise a comprehensive error message + if occupied_ports: + if len(occupied_ports) == 1: + msg = f"{occupied_ports[0]} is not available" + else: + msg = f"Multiple ports are not available: {', '.join(occupied_ports)}" raise SetupFailedError(msg) - # silence aioslimproto logger a bit - if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL): - logging.getLogger("aioslimproto").setLevel(logging.DEBUG) - else: - logging.getLogger("aioslimproto").setLevel(self.logger.level + 10) - self.slimproto = SlimServer( - cli_port=telnet_port or None, - cli_port_json=json_port or None, - ip_address=self.mass.streams.publish_ip, - name="Music Assistant", - control_port=control_port, - ) - # start slimproto socket server - await self.slimproto.start() + + async def _cleanup_server(self) -> None: + """Ensure complete cleanup of the SlimProto server on initialization failure.""" + if self.slimproto: + try: + await self.slimproto.stop() + except Exception as err: + self.logger.warning("Error stopping SlimProto server during cleanup: %s", err) + finally: + self.slimproto = None + + # Clear any associated state that might have been created + self._players.clear() + self._multi_client_streams.clear() async def loaded_in_mass(self) -> None: """Call after the provider has been loaded.""" await super().loaded_in_mass() + assert self.slimproto is not None # for type checker self.slimproto.subscribe(self._handle_slimproto_event) self.mass.streams.register_dynamic_route( "/slimproto/multi", self._serve_multi_client_stream @@ -96,28 +143,32 @@ class SqueezelitePlayerProvider(PlayerProvider): async def unload(self, is_removed: bool = False) -> None: """Handle unload/close of the provider.""" - if self.slimproto: - await self.slimproto.stop() + # Ensure complete cleanup + await self._cleanup_server() self.mass.streams.unregister_dynamic_route("/slimproto/multi") self.mass.streams.unregister_dynamic_route("/jsonrpc.js") def get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int: """Return corrected elapsed milliseconds for a slimplayer.""" - sync_delay = self.mass.config.get_raw_player_config_value( - slimplayer.player_id, CONF_SYNC_ADJUST, 0 + sync_delay = cast( + "int", + self.mass.config.get_raw_player_config_value(slimplayer.player_id, CONF_SYNC_ADJUST, 0), ) - return slimplayer.elapsed_milliseconds - sync_delay + return cast("int", slimplayer.elapsed_milliseconds - sync_delay) def _handle_slimproto_event( self, event: SlimEvent, ) -> None: - if self.mass.closing: + """Handle events from SlimProto players.""" + # Exit early if system is closing or slimproto server is not initialized + if self.mass.closing or not self.slimproto: return - # handle new player connect (or reconnect of existing player) + # Handle new player connect (or reconnect of existing player) if event.type == SlimEventType.PLAYER_CONNECTED: - if not (slimclient := self.slimproto.get_player(event.player_id)): + slimclient = self.slimproto.get_player(event.player_id) + if not slimclient: return # should not happen, but guard anyways player = SqueezelitePlayer(self, event.player_id, slimclient) self.mass.create_task(player.setup()) @@ -128,7 +179,7 @@ class SqueezelitePlayerProvider(PlayerProvider): if TYPE_CHECKING: player = cast("SqueezelitePlayer", player) - # handle player disconnect + # Handle player disconnect if event.type == SlimEventType.PLAYER_DISCONNECTED: self.mass.create_task(self.mass.players.unregister(player.player_id)) return -- 2.34.1