)
self._http_session: ClientSession | None = None
self._http_session_no_ssl: ClientSession | None = None
+ self._mdns_locks: dict[str, asyncio.Lock] = {}
async def start(self) -> None:
"""Start running the Music Assistant server."""
# If a provider fails, that will not block the loading of other providers.
self.create_task(self.load_provider(prov_conf.instance_id, allow_retry=True))
- async def _load_provider(self, conf: ProviderConfig) -> None:
+ async def _load_provider(self, conf: ProviderConfig) -> None: # noqa: PLR0915
"""Load (or reload) a provider."""
# if provider is already loaded, stop and unload it first
await self.unload_provider(conf.instance_id)
)
provider.available = True
- self.create_task(provider.loaded_in_mass())
+ # execute post load actions
+ async def _on_provider_loaded() -> None:
+ await provider.loaded_in_mass()
+ if provider.type != ProviderType.PLAYER:
+ return
+ # add mdns discovery if needed
+ if provider.instance_id not in self._mdns_locks:
+ self._mdns_locks[provider.instance_id] = asyncio.Lock()
+ async with self._mdns_locks[provider.instance_id]:
+ for mdns_type in provider.manifest.mdns_discovery or []:
+ for mdns_name in set(self.aiozc.zeroconf.cache.cache):
+ if mdns_type not in mdns_name or mdns_type == mdns_name:
+ continue
+ info = AsyncServiceInfo(mdns_type, mdns_name)
+ if await info.async_request(self.aiozc.zeroconf, 3000):
+ await provider.on_mdns_service_state_change(
+ mdns_name, ServiceStateChange.Added, info
+ )
+
+ self.create_task(_on_provider_loaded())
+
+ # clear any previous error in config and signal update
self.config.set(f"{CONF_PROVIDERS}/{conf.instance_id}/last_error", None)
self.signal_event(EventType.PROVIDERS_UPDATED, data=self.get_providers())
await self._update_available_providers_cache()
"""Handle MDNS service state callback."""
async def process_mdns_state_change(prov: ProviderInstanceType) -> None:
+ if prov.instance_id not in self._mdns_locks:
+ self._mdns_locks[prov.instance_id] = asyncio.Lock()
if state_change == ServiceStateChange.Removed:
info = None
else:
info = AsyncServiceInfo(service_type, name)
await info.async_request(zeroconf, 3000)
- await prov.on_mdns_service_state_change(name, state_change, info)
+ # use a lock per provider instance to avoid
+ # race conditions in processing mdns events
+ async with self._mdns_locks[prov.instance_id]:
+ await prov.on_mdns_service_state_change(name, state_change, info)
LOGGER.log(
VERBOSE_LOG_LEVEL,
from typing import TYPE_CHECKING
-from zeroconf import ServiceStateChange
-from zeroconf.asyncio import AsyncServiceInfo
-
from .provider import Provider
if TYPE_CHECKING:
async def discover_players(self) -> None:
"""Discover players for this provider."""
- # This will be called (once) when the player provider is loaded into MA.
- # Default implementation is mdns discovery, which will also automatically
- # discovery players during runtime. If a provider overrides this method and
- # doesn't use mdns, it is responsible for periodically searching for new players.
- if not self.available:
- return
- for mdns_type in self.manifest.mdns_discovery or []:
- for mdns_name in set(self.mass.aiozc.zeroconf.cache.cache):
- if mdns_type not in mdns_name or mdns_type == mdns_name:
- continue
- info = AsyncServiceInfo(mdns_type, mdns_name)
- if await info.async_request(self.mass.aiozc.zeroconf, 3000):
- await self.on_mdns_service_state_change(
- mdns_name, ServiceStateChange.Added, info
- )
+ # This will be called when the player provider is (re)loaded into MA.
+ # For providers that support dynamic discovery of players via mdns,
+ # there is no need to implement this method.
@property
def players(self) -> list[Player]: