small improvements
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 6 Apr 2022 15:40:13 +0000 (17:40 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 6 Apr 2022 15:40:13 +0000 (17:40 +0200)
music_assistant/controllers/players.py
music_assistant/controllers/stream.py
music_assistant/mass.py
music_assistant/models/player.py

index a77e63fe7490dc6d877fe6637297e7340b00f808..15efad111252deabe3eec8d4cbdfad7af7764a5a 100755 (executable)
@@ -73,6 +73,8 @@ class PlayerController:
 
     async def register_player(self, player: PlayerType) -> None:
         """Register a new player on the controller."""
+        if self.mass.closed:
+            return
         player_id = player.player_id
 
         if player_id in self._players:
index c94445af3a8fad23b3636d3f7e38b51ffd8d8f2a..bc306fd6bc32f49441e71a8d6be3c31046bc29bf 100644 (file)
@@ -58,17 +58,13 @@ class StreamController:
         runner = web.AppRunner(app, access_log=None)
         await runner.setup()
         # set host to None to bind to all addresses on both IPv4 and IPv6
-        http_site = web.TCPSite(
-            runner, host=None, port=self._port, reuse_address=True, reuse_port=True
-        )
+        http_site = web.TCPSite(runner, host=None, port=self._port)
         await http_site.start()
 
         async def on_shutdown_event(*args, **kwargs):
             """Handle shutdown event."""
-            for task in self._stream_tasks.values():
-                task.cancel()
             await http_site.stop()
-            await runner.shutdown()
+            await runner.cleanup()
             await app.shutdown()
             await app.cleanup()
             self.logger.info("Streamserver exited.")
index b3aa6343e4dae8e95094a896b4e113131385a0da..6d208c6bcd654d8ca35898cd4d44e8c16f2fc1e7 100644 (file)
@@ -54,6 +54,7 @@ class MusicAssistant:
         self.music = MusicController(self)
         self.players = PlayerController(self, stream_port)
         self._tracked_tasks: List[asyncio.Task] = []
+        self.closed = False
 
     async def setup(self) -> None:
         """Async setup of music assistant."""
@@ -63,7 +64,7 @@ class MusicAssistant:
         if not self.http_session:
             self.http_session = aiohttp.ClientSession(
                 loop=self.loop,
-                connector=aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False),
+                connector=aiohttp.TCPConnector(ssl=False),
             )
         # setup core controllers
         await self.cache.setup()
@@ -75,15 +76,13 @@ class MusicAssistant:
     async def stop(self) -> None:
         """Stop running the music assistant server."""
         self.logger.info("Stop called, cleaning up...")
-        # cancel any running tasks
+        # cancel all running tasks
         for task in self._tracked_tasks:
             task.cancel()
         self.signal_event(EventType.SHUTDOWN)
-        # wait for any remaining tasks launched by the shutdown event
-        await asyncio.wait_for(asyncio.wait(self._tracked_tasks), 2)
+        self.closed = True
         if self.http_session and not self.http_session_provided:
-            await self.http_session.connector.close()
-            self.http_session.detach()
+            await self.http_session.close()
 
     def signal_event(self, event_type: EventType, event_details: Any = None) -> None:
         """
@@ -92,6 +91,8 @@ class MusicAssistant:
             :param event_msg: the eventmessage to signal
             :param event_details: optional details to send with the event.
         """
+        if self.closed:
+            return
         for cb_func, event_filter in self._listeners:
             if event_filter is None or event_type in event_filter:
                 self.create_task(cb_func, event_type, event_details)
@@ -133,8 +134,10 @@ class MusicAssistant:
         """
         Create Task on (main) event loop from Callable or awaitable.
 
-        Tasks create dby this helper will be properly cancelled on stop.
+        Tasks createby this helper will be properly cancelled on stop.
         """
+        if self.closed:
+            return
 
         # Check for partials to properly determine if coroutine function
         check_target = target
@@ -163,12 +166,10 @@ class MusicAssistant:
                 task = self.loop.create_task(executor_wrapper(target, *args, **kwargs))
 
         def task_done_callback(*args, **kwargs):
-            self.logger.debug("task finished %s", task.get_name())
             self._tracked_tasks.remove(task)
 
         self._tracked_tasks.append(task)
         task.add_done_callback(task_done_callback)
-        self.logger.debug("spawned task %s", task.get_name())
         return task
 
     async def __process_jobs(self):
index cb74b91fcea7a1526605b132b1999cc5c4bea84a..2f6bba212c6862cc478a4689aa03f817a5623ed7 100755 (executable)
@@ -44,6 +44,8 @@ class Player(ABC):
     """Model for a music player."""
 
     player_id: str
+    # mass object will be set by playermanager at register
+    mass: MusicAssistant = None  # type: ignore[assignment]
     _attr_is_group: bool = False
     _attr_group_childs: List[str] = []
     _attr_name: str = ""
@@ -57,8 +59,7 @@ class Player(ABC):
     _attr_max_sample_rate: int = 96000
     _attr_active_queue_id: str = ""
     _attr_use_multi_stream: bool = False
-    # mass object will be set by playermanager at register
-    mass: MusicAssistant = None  # type: ignore[assignment]
+    _attr_group_parent: List[str] = []  # will be set by player manager
 
     @property
     def name(self) -> bool:
@@ -189,13 +190,18 @@ class Player(ABC):
 
     def update_state(self) -> None:
         """Update current player state in the player manager."""
+        if self.mass is None or self.mass.closed:
+            # guard
+            return
+        self._attr_group_childs = self.get_group_parents()
         # determine active queue for player
         queue_id = self.player_id
-        for player_id in self.get_group_parents():
-            if player := self.mass.players.get_player(player_id):
-                if player.state in [PlayerState.PLAYING, PlayerState.PAUSED]:
-                    queue_id = player_id
-                    break
+        for state in [PlayerState.PLAYING, PlayerState.PAUSED]:
+            for player_id in self._attr_group_childs:
+                if player := self.mass.players.get_player(player_id):
+                    if player.state == state:
+                        queue_id = player_id
+                        break
         self._attr_active_queue_id = queue_id
         # basic throttle: do not send state changed events if player did not change
         prev_state = getattr(self, "_prev_state", None)
@@ -212,7 +218,7 @@ class Player(ABC):
                     self.mass.create_task(player.update_state)
         else:
             # update group player when child updates
-            for group_player_id in self.get_group_parents():
+            for group_player_id in self._attr_group_childs:
                 if player := self.mass.players.get_player(group_player_id):
                     self.mass.create_task(player.update_state)
 
@@ -235,6 +241,7 @@ class Player(ABC):
             "available": self.available,
             "is_group": self.is_group,
             "group_childs": self.group_childs,
+            "group_parents": self._attr_group_childs,
             "volume_level": int(self.volume_level),
             "device_info": self.device_info.to_dict(),
             "active_queue": self.active_queue.queue_id,