chore: mypy for airplay (#1848)
authorJc2k <john.carr@unrouted.co.uk>
Thu, 9 Jan 2025 20:46:35 +0000 (20:46 +0000)
committerGitHub <noreply@github.com>
Thu, 9 Jan 2025 20:46:35 +0000 (21:46 +0100)
music_assistant/providers/airplay/__init__.py
music_assistant/providers/airplay/helpers.py
music_assistant/providers/airplay/provider.py
music_assistant/providers/airplay/raop.py
pyproject.toml

index 83081ea52d86974f646d462a71344e7c3fc0ecbb..ee3282cbf1cc840f362ea39bf89e93f41f644d81 100644 (file)
@@ -8,7 +8,7 @@ from music_assistant_models.config_entries import ConfigEntry, ConfigValueType,
 from music_assistant_models.enums import ConfigEntryType
 from music_assistant_models.provider import ProviderManifest
 
-from music_assistant import MusicAssistant
+from music_assistant.mass import MusicAssistant
 
 from .const import CONF_BIND_INTERFACE
 from .provider import AirplayProvider
@@ -17,7 +17,6 @@ if TYPE_CHECKING:
     from music_assistant_models.config_entries import ProviderConfig
     from music_assistant_models.provider import ProviderManifest
 
-    from music_assistant import MusicAssistant
     from music_assistant.models import ProviderInstanceType
 
 
index ddb6c63a83436cced0b58ce89c29767b02a59fed..4ecb594b4e3ef9a0bd72f183cd789e81c2880eab 100644 (file)
@@ -30,8 +30,6 @@ def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:
         return (manufacturer, model)
     # try parse from am property
     if am_property := info.decoded_properties.get("am"):
-        if isinstance(am_property, bytes):
-            am_property = am_property.decode("utf-8")
         model = am_property
 
     if not model:
index d44963031c78ae254c460db502c914aa0601ce9e..3549b4898dd7672dfe17c3d596aa8602f36c70a7 100644 (file)
@@ -8,7 +8,7 @@ import platform
 import socket
 import time
 from random import randrange
-from typing import TYPE_CHECKING
+from typing import cast
 
 from music_assistant_models.config_entries import ConfigEntry
 from music_assistant_models.enums import (
@@ -36,12 +36,13 @@ from music_assistant.constants import (
     CONF_ENTRY_SYNC_ADJUST,
     create_sample_rates_config_entry,
 )
-from music_assistant.helpers.audio import get_ffmpeg_stream
 from music_assistant.helpers.datetime import utc
+from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.helpers.process import check_output
 from music_assistant.helpers.util import TaskManager, get_ip_pton, lock, select_free_port
 from music_assistant.models.player_provider import PlayerProvider
 from music_assistant.providers.airplay.raop import RaopStreamSession
+from music_assistant.providers.player_group import PlayerGroupProvider
 
 from .const import (
     AIRPLAY_FLOW_PCM_FORMAT,
@@ -61,10 +62,6 @@ from .helpers import (
 )
 from .player import AirPlayPlayer
 
-if TYPE_CHECKING:
-    from music_assistant.providers.player_group import PlayerGroupProvider
-
-
 PLAYER_CONFIG_ENTRIES = (
     CONF_ENTRY_FLOW_MODE_ENFORCED,
     CONF_ENTRY_CROSSFADE,
@@ -138,15 +135,15 @@ BROKEN_RAOP_WARN = ConfigEntry(
 class AirplayProvider(PlayerProvider):
     """Player provider for Airplay based players."""
 
-    cliraop_bin: str | None = None
+    cliraop_bin: str | None
     _players: dict[str, AirPlayPlayer]
-    _dacp_server: asyncio.Server = None
-    _dacp_info: AsyncServiceInfo = None
+    _dacp_server: asyncio.Server
+    _dacp_info: AsyncServiceInfo
 
     @property
     def supported_features(self) -> set[ProviderFeature]:
         """Return the features supported by this Provider."""
-        return (ProviderFeature.SYNC_PLAYERS,)
+        return {ProviderFeature.SYNC_PLAYERS}
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
@@ -179,10 +176,12 @@ class AirplayProvider(PlayerProvider):
         self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
     ) -> None:
         """Handle MDNS service state callback."""
+        if not info:
+            return
         if "@" in name:
             raw_id, display_name = name.split(".")[0].split("@", 1)
-        elif "deviceid" in info.decoded_properties:
-            raw_id = info.decoded_properties["deviceid"].replace(":", "")
+        elif deviceid := info.decoded_properties.get("deviceid"):
+            raw_id = deviceid.replace(":", "")
             display_name = info.name.split(".")[0]
         else:
             return
@@ -263,6 +262,8 @@ class AirplayProvider(PlayerProvider):
         - player_id: player_id of the player to handle the command.
         """
         player = self.mass.players.get(player_id)
+        if not player:
+            return
         if player.group_childs:
             # pause is not supported while synced, use stop instead
             self.logger.debug("Player is synced, using STOP instead of PAUSE")
@@ -279,6 +280,8 @@ class AirplayProvider(PlayerProvider):
     ) -> None:
         """Handle PLAY MEDIA on given player."""
         player = self.mass.players.get(player_id)
+        if not player:
+            return
         # set the active source for the player to the media queue
         # this accounts for syncgroups and linked players (e.g. sonos)
         player.active_source = media.queue_id
@@ -300,7 +303,7 @@ class AirplayProvider(PlayerProvider):
             )
         elif media.queue_id and media.queue_id.startswith("ugp_"):
             # special case: UGP stream
-            ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+            ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
             ugp_stream = ugp_provider.ugp_streams[media.queue_id]
             input_format = ugp_stream.output_format
             audio_source = ugp_stream.subscribe()
@@ -338,6 +341,8 @@ class AirplayProvider(PlayerProvider):
         if airplay_player.raop_stream and airplay_player.raop_stream.running:
             await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
         mass_player = self.mass.players.get(player_id)
+        if not mass_player:
+            return
         mass_player.volume_level = volume_level
         mass_player.volume_muted = volume_level == 0
         self.mass.players.update(player_id)
@@ -404,22 +409,24 @@ class AirplayProvider(PlayerProvider):
             - player_id: player_id of the player to handle the command.
         """
         mass_player = self.mass.players.get(player_id, raise_unavailable=True)
-        if not mass_player.synced_to:
+        if not mass_player or not mass_player.synced_to:
             return
         ap_player = self._players[player_id]
         if ap_player.raop_stream and ap_player.raop_stream.running:
             await ap_player.raop_stream.session.remove_client(ap_player)
         group_leader = self.mass.players.get(mass_player.synced_to, raise_unavailable=True)
+        assert group_leader
         if player_id in group_leader.group_childs:
             group_leader.group_childs.remove(player_id)
         mass_player.synced_to = None
         airplay_player = self._players.get(player_id)
-        await airplay_player.cmd_stop()
+        if airplay_player:
+            await airplay_player.cmd_stop()
         # make sure that the player manager gets an update
         self.mass.players.update(mass_player.player_id, skip_forward=True)
         self.mass.players.update(group_leader.player_id, skip_forward=True)
 
-    async def _getcliraop_binary(self):
+    async def _getcliraop_binary(self) -> str:
         """Find the correct raop/airplay binary belonging to the platform."""
         # ruff: noqa: SIM102
         if self.cliraop_bin is not None:
@@ -435,7 +442,8 @@ class AirplayProvider(PlayerProvider):
                     self.cliraop_bin = cliraop_path
                     return cliraop_path
             except OSError:
-                return None
+                pass
+            return None
 
         base_path = os.path.join(os.path.dirname(__file__), "bin")
         system = platform.system().lower().replace("darwin", "macos")
@@ -452,6 +460,7 @@ class AirplayProvider(PlayerProvider):
     def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]:
         """Get all sync clients for a player."""
         mass_player = self.mass.players.get(player_id, True)
+        assert mass_player
         sync_clients: list[AirPlayPlayer] = []
         # we need to return the player itself too
         group_child_ids = {player_id}
@@ -541,15 +550,15 @@ class AirplayProvider(PlayerProvider):
             else:
                 headers_raw = request
                 body = ""
-            headers_raw = headers_raw.split("\r\n")
+            headers_split = headers_raw.split("\r\n")
             headers = {}
-            for line in headers_raw[1:]:
+            for line in headers_split[1:]:
                 if ":" not in line:
                     continue
                 x, y = line.split(":", 1)
                 headers[x.strip()] = y.strip()
             active_remote = headers.get("Active-Remote")
-            _, path, _ = headers_raw[0].split(" ")
+            _, path, _ = headers_split[0].split(" ")
             airplay_player = next(
                 (
                     x
@@ -570,6 +579,8 @@ class AirplayProvider(PlayerProvider):
 
             player_id = airplay_player.player_id
             mass_player = self.mass.players.get(player_id)
+            if not mass_player:
+                return
             active_queue = self.mass.player_queues.get_active_queue(player_id)
             if path == "/ctrl-int/1/nextitem":
                 self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
@@ -590,10 +601,10 @@ class AirplayProvider(PlayerProvider):
                 self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
             elif path == "/ctrl-int/1/shuffle_songs":
                 queue = self.mass.player_queues.get(player_id)
-                self.mass.loop.call_soon(
-                    self.mass.player_queues.set_shuffle(
-                        active_queue.queue_id, not queue.shuffle_enabled
-                    )
+                if not queue:
+                    return
+                self.mass.player_queues.set_shuffle(
+                    active_queue.queue_id, not queue.shuffle_enabled
                 )
             elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
                 # sometimes this request is sent by a device as confirmation of a play command
@@ -650,11 +661,13 @@ class AirplayProvider(PlayerProvider):
         finally:
             writer.close()
 
-    async def monitor_prevent_playback(self, player_id: str):
+    async def monitor_prevent_playback(self, player_id: str) -> None:
         """Monitor the prevent playback state of an airplay player."""
         count = 0
         if not (airplay_player := self._players.get(player_id)):
             return
+        if not airplay_player.raop_stream:
+            return
         prev_active_remote_id = airplay_player.raop_stream.active_remote_id
         while count < 40:
             count += 1
index 98841c263f9ee84893ef8776ef12a26e9904cde9..27c6062aa0f678186c4b31ec91c0884ad2ecdbf1 100644 (file)
@@ -54,7 +54,7 @@ class RaopStreamSession:
         self.input_format = input_format
         self._sync_clients = sync_clients
         self._audio_source = audio_source
-        self._audio_source_task: asyncio.Task | None = None
+        self._audio_source_task: asyncio.Task[None] | None = None
         self._stopped: bool = False
         self._lock = asyncio.Lock()
 
@@ -75,14 +75,18 @@ class RaopStreamSession:
                         return
                     async with self._lock:
                         await asyncio.gather(
-                            *[x.raop_stream.write_chunk(chunk) for x in self._sync_clients],
+                            *[
+                                x.raop_stream.write_chunk(chunk)
+                                for x in self._sync_clients
+                                if x.raop_stream
+                            ],
                             return_exceptions=True,
                         )
                 # entire stream consumed: send EOF
                 generator_exhausted = True
                 async with self._lock:
                     await asyncio.gather(
-                        *[x.raop_stream.write_eof() for x in self._sync_clients],
+                        *[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream],
                         return_exceptions=True,
                     )
             finally:
@@ -90,12 +94,17 @@ class RaopStreamSession:
                     await close_async_generator(self._audio_source)
 
         # get current ntp and start RaopStream per player
+        assert self.prov.cliraop_bin
         _, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
         start_ntp = int(stdout.strip())
         wait_start = 1500 + (250 * len(self._sync_clients))
         async with self._lock:
             await asyncio.gather(
-                *[x.raop_stream.start(start_ntp, wait_start) for x in self._sync_clients],
+                *[
+                    x.raop_stream.start(start_ntp, wait_start)
+                    for x in self._sync_clients
+                    if x.raop_stream
+                ],
                 return_exceptions=True,
             )
         self._audio_source_task = asyncio.create_task(audio_streamer())
@@ -116,6 +125,7 @@ class RaopStreamSession:
         """Remove a sync client from the session."""
         if airplay_player not in self._sync_clients:
             return
+        assert airplay_player.raop_stream
         assert airplay_player.raop_stream.session == self
         async with self._lock:
             self._sync_clients.remove(airplay_player)
@@ -154,7 +164,7 @@ class RaopStream:
         # with the named pipe used to send audio
         self.active_remote_id: str = str(randint(1000, 8000))
         self.prevent_playback: bool = False
-        self._log_reader_task: asyncio.Task | None = None
+        self._log_reader_task: asyncio.Task[None] | asyncio.Future[None] | None = None
         self._cliraop_proc: AsyncProcess | None = None
         self._ffmpeg_proc: AsyncProcess | None = None
         self._started = asyncio.Event()
@@ -170,6 +180,8 @@ class RaopStream:
         extra_args = []
         player_id = self.airplay_player.player_id
         mass_player = self.mass.players.get(player_id)
+        if not mass_player:
+            return
         bind_ip = await self.mass.config.get_provider_config_value(
             self.prov.instance_id, CONF_BIND_INTERFACE
         )
@@ -240,10 +252,12 @@ class RaopStream:
         self._started.set()
         self._log_reader_task = self.mass.create_task(self._log_watcher())
 
-    async def stop(self):
+    async def stop(self) -> None:
         """Stop playback and cleanup."""
         if self._stopped:
             return
+        if not self._cliraop_proc:
+            return
         if self._cliraop_proc.proc and not self._cliraop_proc.closed:
             await self.send_cli_command("ACTION=STOP")
         self._stopped = True  # set after send_cli command!
@@ -259,6 +273,7 @@ class RaopStream:
         if self._stopped:
             return
         await self._started.wait()
+        assert self._ffmpeg_proc
         await self._ffmpeg_proc.write(chunk)
 
     async def write_eof(self) -> None:
@@ -266,6 +281,7 @@ class RaopStream:
         if self._stopped:
             return
         await self._started.wait()
+        assert self._ffmpeg_proc
         await self._ffmpeg_proc.write_eof()
 
     async def send_cli_command(self, command: str) -> None:
@@ -277,7 +293,7 @@ class RaopStream:
         if not command.endswith("\n"):
             command += "\n"
 
-        def send_data():
+        def send_data() -> None:
             with suppress(BrokenPipeError), open(named_pipe, "w") as f:
                 f.write(command)
 
@@ -290,11 +306,15 @@ class RaopStream:
         """Monitor stderr for the running CLIRaop process."""
         airplay_player = self.airplay_player
         mass_player = self.mass.players.get(airplay_player.player_id)
+        if not mass_player:
+            return
         queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
         logger = airplay_player.logger
         lost_packets = 0
         prev_metadata_checksum: str = ""
         prev_progress_report: float = 0
+        if not self._cliraop_proc:
+            return
         async for line in self._cliraop_proc.iter_stderr():
             if "elapsed milliseconds:" in line:
                 # this is received more or less every second while playing
index 459189f26fe85b88b687be32d7b4ed0f1dd04593..8f9e5ab61a499231e7ba25b9fc975252ccd5d397 100644 (file)
@@ -144,7 +144,6 @@ exclude = [
     '^music_assistant/__main__\.py$',
     '^music_assistant/providers/_template_music_provider/.*$',
     '^music_assistant/providers/_template_player_provider/.*$',
-    '^music_assistant/providers/airplay/.*$',
     '^music_assistant/providers/apple_music/.*$',
     '^music_assistant/providers/bluesound/.*$',
     '^music_assistant/providers/chromecast/.*$',