import logging
import os
from collections.abc import Awaitable, Callable, Coroutine
-from typing import TYPE_CHECKING, Any, Self
+from typing import TYPE_CHECKING, Any, Self, TypeVar
from uuid import uuid4
import aiofiles
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PROVIDERS_PATH = os.path.join(BASE_DIR, "providers")
+_R = TypeVar("_R")
+
class MusicAssistant:
"""Main MusicAssistant (Server) object."""
def create_task(
self,
- target: Coroutine | Awaitable | Callable,
+ target: Coroutine[Any, Any, _R] | Awaitable[_R] | Callable[..., _R],
*args: Any,
task_id: str | None = None,
abort_existing: bool = False,
**kwargs: Any,
- ) -> asyncio.Task | asyncio.Future:
+ ) -> asyncio.Task[_R] | asyncio.Future[_R]:
"""Create Task on (main) event loop from Coroutine(function).
Tasks created by this helper will be properly cancelled on stop.
import logging
from collections import OrderedDict
from dataclasses import dataclass, field
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
from music_assistant_models.enums import (
from music_assistant_models.provider import ProviderManifest
from soco.core import SoCo
- from music_assistant import MusicAssistant
+ from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType
class SonosPlayerProvider(PlayerProvider):
"""Sonos Player provider."""
- sonosplayers: dict[str, SonosPlayer] | None = None
_discovery_running: bool = False
_discovery_reschedule_timer: asyncio.TimerHandle | None = None
+ def __init__(self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig):
+ """Handle initialization of the provider."""
+ super().__init__(mass, manifest, config)
+ self.sonosplayers: OrderedDict[str, SonosPlayer] = OrderedDict()
+
@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self.sonosplayers: OrderedDict[str, SonosPlayer] = OrderedDict()
self.topology_condition = asyncio.Condition()
self.boot_counts: dict[str, int] = {}
self.mdns_names: dict[str, str] = {}
await asyncio.gather(*(player.offline() for player in self.sonosplayers.values()))
if events_asyncio.event_listener:
await events_asyncio.event_listener.async_stop()
- self.sonosplayers = None
+ self.sonosplayers = OrderedDict()
async def get_player_config_entries(
self,
"""Handle PLAY MEDIA on given player."""
sonos_player = self.sonosplayers[player_id]
mass_player = self.mass.players.get(player_id)
+ assert mass_player
if sonos_player.sync_coordinator:
# this should be already handled by the player manager, but just in case...
msg = (
media.uri = media.uri.replace(".flac", ".mp3")
didl_metadata = create_didl_metadata(media)
# set crossfade according to player setting
- crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
+ crossfade = bool(await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE))
if sonos_player.crossfade != crossfade:
def set_crossfade() -> None:
self._discovery_running = True
try:
self.logger.debug("Sonos discovery started...")
- discovered_devices: set[SoCo] = discover(
- timeout=30, household_id=household_id, allow_network_scan=allow_network_scan
+ discovered_devices: set[SoCo] = (
+ discover(
+ timeout=30, household_id=household_id, allow_network_scan=allow_network_scan
+ )
+ or set()
)
- if discovered_devices is None:
- discovered_devices = set()
+
# process new players
for soco in discovered_devices:
try:
async def discover_household_ids(mass: MusicAssistant, prefer_s1: bool = True) -> list[str]:
"""Discover the HouseHold ID of S1 speaker(s) the network."""
if cache := await mass.cache.get("sonos_household_ids"):
- return cache
+ return cast(list[str], cache)
household_ids: list[str] = []
def get_all_sonos_ips() -> set[SoCo]:
from soco.exceptions import SoCoException, SoCoUPnPException
if TYPE_CHECKING:
- from . import SonosPlayer
+ from .player import SonosPlayer
UID_PREFIX = "RINCON_"
"""Extract the best available target identifier from the provided instance object."""
if zone_name := getattr(instance, "zone_name", None):
# SonosPlayer instance
- return zone_name
+ return str(zone_name)
if soco := getattr(instance, "soco", fallback_soco):
# Holds a SoCo instance attribute
# Only use attributes with no I/O
- return soco._player_name or soco.ip_address
+ return str(soco._player_name or soco.ip_address)
return None
"""Ensure I/O attributes are cached and return visible zones."""
_ = soco.household_id
_ = soco.uid
- return soco.visible_zones
+ return soco.visible_zones or set()
"""Return zone name."""
if self.mass_player:
return self.mass_player.display_name
- return self.soco.speaker_info["zone_name"]
+ return str(self.soco.speaker_info["zone_name"])
@property
def subscription_address(self) -> str:
async def poll_speaker(self) -> None:
"""Poll the speaker for updates."""
- def _poll():
+ def _poll() -> None:
"""Poll the speaker for updates (NOT async friendly)."""
self.update_groups()
self.poll_media()
self._set_basic_track_info(update_position=update_position)
self.update_player()
- async def _subscribe_target(self, target: SubscriptionBase, sub_callback: Callable) -> None:
+ async def _subscribe_target(
+ self, target: SubscriptionBase, sub_callback: Callable[[SonosEvent], None]
+ ) -> None:
"""Create a Sonos subscription for given target."""
subscription = await target.subscribe(
auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT
self.logger.debug("%s was missing, adding to %s group", missing_zone, self.zone_name)
self.update_groups()
- def create_update_groups_coro(self, event: SonosEvent | None = None) -> Coroutine:
+ def create_update_groups_coro(
+ self, event: SonosEvent | None = None
+ ) -> Coroutine[Any, Any, None]:
"""Handle callback for topology change event."""
def _get_soco_group() -> list[str]:
self.mass.loop.call_soon_threadsafe(self.mass.players.update, self.player_id)
for joined_uid in group[1:]:
- joined_speaker: SonosPlayer = self.sonos_prov.sonosplayers.get(joined_uid)
+ joined_speaker = self.sonos_prov.sonosplayers.get(joined_uid)
if joined_speaker:
joined_speaker.sync_coordinator = self
joined_speaker.group_members = group_members
@soco_error()
def _join(self, members: list[SonosPlayer]) -> list[SonosPlayer]:
if self.sync_coordinator:
- self.unjoin()
+ self._unjoin()
group = [self]
else:
group = self.group_members.copy()
return track_info
-def _convert_state(sonos_state: str) -> PlayerState:
+def _convert_state(sonos_state: str | None) -> PlayerState:
"""Convert Sonos state to PlayerState."""
if sonos_state == "PLAYING":
return PlayerState.PLAYING
return PlayerState.IDLE
-def _timespan_secs(timespan):
+def _timespan_secs(timespan: str | None) -> int | None:
"""Parse a time-span into number of seconds."""
- if timespan in ("", "NOT_IMPLEMENTED", None):
+ if timespan in ("", "NOT_IMPLEMENTED"):
+ return None
+ if timespan is None:
return None
- return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":"))))
+ return int(sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":")))))
'^music_assistant/providers/siriusxm/.*$',
'^music_assistant/providers/slimproto/.*$',
'^music_assistant/providers/sonos/.*$',
- '^music_assistant/providers/sonos_s1/.*$',
'^music_assistant/providers/soundcloud/.*$',
'^music_assistant/providers/snapcast/.*$',
'^music_assistant/providers/spotify/.*$',