MessageCallback = Callable[[dict[str, Any]], None]
-def send(json_msg: dict[str, Any]):
+def send(json_msg: dict[str, Any]) -> None:
"""Send a message to stdout."""
sys.stdout.write(json.dumps(json_msg))
sys.stdout.write("\n")
self.api_port = api_port
self.streamserver_ip = streamserver_ip
self.streamserver_port = streamserver_port
- self._metadata = {}
- self._properties = {}
+ self._metadata: dict[str, Any] = {}
+ self._properties: dict[str, Any] = {}
self._request_callbacks: dict[str, MessageCallback] = {}
self._seek_offset = 0.0
self.websocket = websocket.WebSocketApp(
self.websocket_thread.name = "massControl"
self.websocket_thread.start()
- def stop(self):
+ def stop(self) -> None:
"""Stop the websocket thread."""
self._stopped = True
self.websocket.close()
# self.send_request("core.mixer.set_mute", {"mute": properties["mute"]})
elif cmd == "GetProperties":
- def handle_result(result: dict[str, Any]):
+ def handle_result(result: dict[str, Any]) -> None:
send(
{
"jsonrpc": "2.0",
"""Send stream ready notification to Snapcast."""
send({"jsonrpc": "2.0", "method": "Plugin.Stream.Ready"})
- def _websocket_loop(self):
+ def _websocket_loop(self) -> None:
logger.info("Started websocket loop")
while not self._stopped:
try:
return properties
- def _on_ws_message(self, ws, message: str):
+ def _on_ws_message(self, ws: websocket.WebSocket, message: str) -> None:
# TODO: error handling
logger.debug("websocket message received: %s", message)
data = json.loads(message)
self.send_snapcast_properties_notification(properties)
return
- def _on_ws_error(self, ws, error):
+ def _on_ws_error(self, ws: websocket.WebSocket, error: Exception | str) -> None:
logger.error("Websocket error")
logger.error(error)
- def _on_ws_open(self, ws):
+ def _on_ws_open(self, ws: websocket.WebSocket) -> None:
logger.info("Snapcast RPC websocket opened")
self.send_snapcast_stream_ready_notification()
- def _on_ws_close(self, ws, close_status_code, close_msg):
+ def _on_ws_close(
+ self, ws: websocket.WebSocket, close_status_code: int | None, close_msg: str | None
+ ) -> None:
logger.info("Snapcast RPC websocket closed")
def send_request(
snap_client_id: str,
) -> None:
"""Init."""
- self.provider: SnapCastProvider
+ self.provider: SnapCastProvider # type: ignore[misc]
self.snap_client = snap_client
self.snap_client_id = snap_client_id
super().__init__(provider, player_id)
- self._stream_task: asyncio.Task | None = None
+ self._stream_task: asyncio.Task[None] | None = None
@property
def synced_to(self) -> str | None:
f"--streamserver-ip={self.mass.streams.publish_ip}%20"
f"--streamserver-port={self.mass.streams.publish_port}"
)
- extra_args = ""
else:
extra_args = ""
if self.synced_to is not None:
return
self._attr_group_members.append(self.player_id)
- {
- self._attr_group_members.append(self.provider._get_ma_id(snap_client_id))
- for snap_client_id in snap_group.clients
- if self.provider._get_ma_id(snap_client_id) != self.player_id
- and self.provider._snapserver.client(snap_client_id).connected
- }
+ for snap_client_id in snap_group.clients:
+ if (
+ self.provider._get_ma_id(snap_client_id) != self.player_id
+ and self.provider._snapserver.client(snap_client_id).connected
+ ):
+ self._attr_group_members.append(self.provider._get_ma_id(snap_client_id))
self.update_state()
"""SnapCastProvider."""
_snapserver: Snapserver
- _snapserver_runner: asyncio.Task | None
+ _snapserver_runner: asyncio.Task[None] | None
_snapserver_started: asyncio.Event | None
_snapcast_server_host: str
_snapcast_server_control_port: int
]
async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc:
# keep reading from stdout until exit
- async for data in snapserver_proc.iter_any():
- data = data.decode().strip() # noqa: PLW2901
- for line in data.split("\n"):
+ async for raw_data in snapserver_proc.iter_any():
+ text = raw_data.decode().strip()
+ for line in text.split("\n"):
logger.debug(line)
if "(Snapserver) Version 0." in line:
# delay init a small bit to prevent race conditions
assert snap_id is not None # for type checking
return snap_id
- def _generate_and_register_id(self, snap_client_id) -> str:
+ def _generate_and_register_id(self, snap_client_id: str) -> str:
search_dict = self._ids_map.inverse
if snap_client_id not in search_dict:
new_id = "ma_" + str(re.sub(r"\W+", "", snap_client_id))
if ma_player := self._handle_player_init(snap_client):
snap_client.set_callback(ma_player._handle_player_update)
for snap_client in self._snapserver.clients:
- if ma_player := self.mass.players.get(self._get_ma_id(snap_client.identifier)):
- assert isinstance(ma_player, SnapCastPlayer) # for type checking
+ if player := self.mass.players.get(self._get_ma_id(snap_client.identifier)):
+ ma_player = cast("SnapCastPlayer", player)
snap_client.set_callback(ma_player._handle_player_update)
for snap_group in self._snapserver.groups:
snap_group.set_callback(self._handle_group_update)
'^music_assistant/providers/bluesound/.*$',
'^music_assistant/providers/chromecast/.*$',
'^music_assistant/providers/sonos/.*$',
- '^music_assistant/providers/snapcast/.*$',
'^music_assistant/providers/ytmusic/.*$',
]
extra_checks = false