Add Atomic Port Validation and Cleanup to Squeezelite Provider (#2352)
authorOzGav <gavnosp@hotmail.com>
Tue, 9 Sep 2025 09:32:40 +0000 (19:32 +1000)
committerGitHub <noreply@github.com>
Tue, 9 Sep 2025 09:32:40 +0000 (11:32 +0200)
music_assistant/providers/squeezelite/provider.py

index f6c8fef2fde344cd23960ab0aabc88ff435a9112..e0778490ce2fc878f94007c901a9fe56b432d8ae 100644 (file)
@@ -23,14 +23,23 @@ from .player import SqueezelitePlayer
 
 if TYPE_CHECKING:
     from aioslimproto.client import SlimClient
+    from music_assistant_models.config_entries import ProviderConfig
+    from music_assistant_models.provider import ProviderManifest
+
+    from music_assistant import MusicAssistant
 
 
 class SqueezelitePlayerProvider(PlayerProvider):
     """Player provider for players using slimproto (like Squeezelite)."""
 
-    def __init__(self, *args, **kwargs) -> None:
+    def __init__(
+        self,
+        mass: MusicAssistant,
+        manifest: ProviderManifest,
+        config: ProviderConfig,
+    ) -> None:
         """Initialize the provider."""
-        super().__init__(*args, **kwargs)
+        super().__init__(mass, manifest, config)
         self.slimproto: SlimServer | None = None
         self._players: dict[str, SqueezelitePlayer] = {}
 
@@ -51,37 +60,75 @@ class SqueezelitePlayerProvider(PlayerProvider):
             logging.getLogger("aioslimproto").setLevel(logging.DEBUG)
         else:
             logging.getLogger("aioslimproto").setLevel(self.logger.level + 10)
-        # setup slimproto server
-        control_port = self.config.get_value(CONF_PORT)
-        if await is_port_in_use(control_port):
-            msg = f"Port {control_port} is not available"
-            raise SetupFailedError(msg)
-        telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
-        if telnet_port is not None and await is_port_in_use(telnet_port):
-            msg = f"Telnet port {telnet_port} is not available"
-            raise SetupFailedError(msg)
-        json_port = self.config.get_value(CONF_CLI_JSON_PORT)
-        if json_port is not None and await is_port_in_use(json_port):
-            msg = f"JSON port {json_port} is not available"
+
+        # Get all port configurations
+        control_port = cast("int", self.config.get_value(CONF_PORT))
+        telnet_port = cast("int | None", self.config.get_value(CONF_CLI_TELNET_PORT))
+        json_port = cast("int | None", self.config.get_value(CONF_CLI_JSON_PORT))
+
+        # Validate ALL required ports before starting ANY services
+        await self._validate_all_ports(control_port, telnet_port, json_port)
+
+        # Only proceed with server creation after all ports are validated
+        try:
+            self.slimproto = SlimServer(
+                cli_port=telnet_port or None,
+                cli_port_json=json_port or None,
+                ip_address=self.mass.streams.publish_ip,
+                name="Music Assistant",
+                control_port=control_port,
+            )
+            # start slimproto socket server
+            await self.slimproto.start()
+        except Exception as err:
+            # Ensure cleanup on any initialization failure
+            await self._cleanup_server()
+            raise SetupFailedError(f"Failed to start SlimProto server: {err}") from err
+
+    async def _validate_all_ports(
+        self, control_port: int, telnet_port: int | None, json_port: int | None
+    ) -> None:
+        """Validate that all required ports are available before starting any services."""
+        ports_to_check = [(control_port, "SlimProto control")]
+
+        if telnet_port and telnet_port > 0:
+            ports_to_check.append((telnet_port, "Telnet CLI"))
+
+        if json_port and json_port > 0:
+            ports_to_check.append((json_port, "JSON-RPC CLI"))
+
+        # Collect all port conflicts before raising any errors
+        occupied_ports = []
+        for port, port_description in ports_to_check:
+            if await is_port_in_use(port):
+                occupied_ports.append(f"{port_description} port {port}")
+
+        # If any ports are occupied, raise a comprehensive error message
+        if occupied_ports:
+            if len(occupied_ports) == 1:
+                msg = f"{occupied_ports[0]} is not available"
+            else:
+                msg = f"Multiple ports are not available: {', '.join(occupied_ports)}"
             raise SetupFailedError(msg)
-        # silence aioslimproto logger a bit
-        if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
-            logging.getLogger("aioslimproto").setLevel(logging.DEBUG)
-        else:
-            logging.getLogger("aioslimproto").setLevel(self.logger.level + 10)
-        self.slimproto = SlimServer(
-            cli_port=telnet_port or None,
-            cli_port_json=json_port or None,
-            ip_address=self.mass.streams.publish_ip,
-            name="Music Assistant",
-            control_port=control_port,
-        )
-        # start slimproto socket server
-        await self.slimproto.start()
+
+    async def _cleanup_server(self) -> None:
+        """Ensure complete cleanup of the SlimProto server on initialization failure."""
+        if self.slimproto:
+            try:
+                await self.slimproto.stop()
+            except Exception as err:
+                self.logger.warning("Error stopping SlimProto server during cleanup: %s", err)
+            finally:
+                self.slimproto = None
+
+        # Clear any associated state that might have been created
+        self._players.clear()
+        self._multi_client_streams.clear()
 
     async def loaded_in_mass(self) -> None:
         """Call after the provider has been loaded."""
         await super().loaded_in_mass()
+        assert self.slimproto is not None  # for type checker
         self.slimproto.subscribe(self._handle_slimproto_event)
         self.mass.streams.register_dynamic_route(
             "/slimproto/multi", self._serve_multi_client_stream
@@ -96,28 +143,32 @@ class SqueezelitePlayerProvider(PlayerProvider):
 
     async def unload(self, is_removed: bool = False) -> None:
         """Handle unload/close of the provider."""
-        if self.slimproto:
-            await self.slimproto.stop()
+        # Ensure complete cleanup
+        await self._cleanup_server()
         self.mass.streams.unregister_dynamic_route("/slimproto/multi")
         self.mass.streams.unregister_dynamic_route("/jsonrpc.js")
 
     def get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int:
         """Return corrected elapsed milliseconds for a slimplayer."""
-        sync_delay = self.mass.config.get_raw_player_config_value(
-            slimplayer.player_id, CONF_SYNC_ADJUST, 0
+        sync_delay = cast(
+            "int",
+            self.mass.config.get_raw_player_config_value(slimplayer.player_id, CONF_SYNC_ADJUST, 0),
         )
-        return slimplayer.elapsed_milliseconds - sync_delay
+        return cast("int", slimplayer.elapsed_milliseconds - sync_delay)
 
     def _handle_slimproto_event(
         self,
         event: SlimEvent,
     ) -> None:
-        if self.mass.closing:
+        """Handle events from SlimProto players."""
+        # Exit early if system is closing or slimproto server is not initialized
+        if self.mass.closing or not self.slimproto:
             return
 
-        # handle new player connect (or reconnect of existing player)
+        # Handle new player connect (or reconnect of existing player)
         if event.type == SlimEventType.PLAYER_CONNECTED:
-            if not (slimclient := self.slimproto.get_player(event.player_id)):
+            slimclient = self.slimproto.get_player(event.player_id)
+            if not slimclient:
                 return  # should not happen, but guard anyways
             player = SqueezelitePlayer(self, event.player_id, slimclient)
             self.mass.create_task(player.setup())
@@ -128,7 +179,7 @@ class SqueezelitePlayerProvider(PlayerProvider):
         if TYPE_CHECKING:
             player = cast("SqueezelitePlayer", player)
 
-        # handle player disconnect
+        # Handle player disconnect
         if event.type == SlimEventType.PLAYER_DISCONNECTED:
             self.mass.create_task(self.mass.players.unregister(player.player_id))
             return