from async_upnp_client.utils import CaseInsensitiveDict
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import PlayerFeature, PlayerState, PlayerType
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+)
from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
PlayerFeature.VOLUME_SET,
)
+CONF_NETWORK_SCAN = "network_scan"
+
_DLNAPlayerProviderT = TypeVar("_DLNAPlayerProviderT", bound="DLNAPlayerProvider")
_R = TypeVar("_R")
_P = ParamSpec("_P")
values: the (intermediate) raw values for config entries sent with the action.
"""
# ruff: noqa: ARG001
- return tuple() # we do not have any config entries (yet)
+ return (
+ ConfigEntry(
+ key=CONF_NETWORK_SCAN,
+ type=ConfigEntryType.BOOLEAN,
+ label="Allow network scan for discovery",
+ default_value=False,
+ description="Enable network scan for discovery of players. \n"
+ "Can be used if (some of) your players are not automatically discovered.",
+ ),
+ )
def catch_request_errors(
try:
self._discovery_running = True
self.logger.debug("DLNA discovery started...")
+ allow_network_scan = self.config.get_value(CONF_NETWORK_SCAN)
discovered_devices: set[str] = set()
async def on_response(discovery_info: CaseInsensitiveDict):
await self._device_discovered(ssdp_udn, discovery_info["location"])
- # we iterate between using a regular and multicast search
- if use_multicast:
+ # we iterate between using a regular and multicast search (if enabled)
+ if allow_network_scan and use_multicast:
await async_search(on_response, target=(str(IPv4Address("255.255.255.255")), 1900))
else:
await async_search(on_response)
self.mass.create_task(self._run_discovery(use_multicast=not use_multicast))
# reschedule self once finished
- self.mass.loop.call_later(120, reschedule)
+ self.mass.loop.call_later(300, reschedule)
async def _device_disconnect(self, dlna_player: DLNAPlayer) -> None:
"""
from soco.groups import ZoneGroup
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import PlayerFeature, PlayerState, PlayerType
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+)
from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
PlayerFeature.VOLUME_SET,
)
+CONF_NETWORK_SCAN = "network_scan"
+
# set event listener port to something other than 1400
# to allow coextistence with HA on the same host
config.EVENT_LISTENER_PORT = 1700
values: the (intermediate) raw values for config entries sent with the action.
"""
# ruff: noqa: ARG001
- return tuple() # we do not have any config entries (yet)
+ return (
+ ConfigEntry(
+ key=CONF_NETWORK_SCAN,
+ type=ConfigEntryType.BOOLEAN,
+ label="Enable network scan for discovery",
+ default_value=False,
+ description="Enable network scan for discovery of players. \n"
+ "Can be used if (some of) your players are not automatically discovered.",
+ ),
+ )
@dataclass
except ConnectionResetError as err:
raise PlayerUnavailableError from err
- async def _run_discovery(self, allow_network_scan=False) -> None:
+ async def _run_discovery(self) -> None:
"""Discover Sonos players on the network."""
if self._discovery_running:
return
self._discovery_running = True
self.logger.debug("Sonos discovery started...")
discovered_devices: set[soco.SoCo] = await asyncio.to_thread(
- soco.discover, allow_network_scan=allow_network_scan
+ soco.discover, allow_network_scan=self.config.get_value(CONF_NETWORK_SCAN)
)
if discovered_devices is None:
discovered_devices = set()
new_device_ids = {item.uid for item in discovered_devices}
cur_player_ids = set(self.sonosplayers.keys())
added_devices = new_device_ids.difference(cur_player_ids)
- removed_devices = cur_player_ids.difference(new_device_ids)
-
- # mark any disconnected players as unavailable...
- for player_id in removed_devices:
- if player := self.mass.players.get(player_id):
- player.available = False
- self.mass.players.update(player_id)
# process new players
for device in discovered_devices:
def reschedule():
self._discovery_reschedule_timer = None
- self.mass.create_task(self._run_discovery(allow_network_scan=not allow_network_scan))
+ self.mass.create_task(self._run_discovery())
# reschedule self once finished
- self._discovery_reschedule_timer = self.mass.loop.call_later(120, reschedule)
+ self._discovery_reschedule_timer = self.mass.loop.call_later(300, reschedule)
async def _device_discovered(self, soco_device: soco.SoCo) -> None:
"""Handle discovered Sonos player."""