async def register_player(self, player: PlayerType) -> None:
"""Register a new player on the controller."""
+ if self.mass.closed:
+ return
player_id = player.player_id
if player_id in self._players:
runner = web.AppRunner(app, access_log=None)
await runner.setup()
# set host to None to bind to all addresses on both IPv4 and IPv6
- http_site = web.TCPSite(
- runner, host=None, port=self._port, reuse_address=True, reuse_port=True
- )
+ http_site = web.TCPSite(runner, host=None, port=self._port)
await http_site.start()
async def on_shutdown_event(*args, **kwargs):
"""Handle shutdown event."""
- for task in self._stream_tasks.values():
- task.cancel()
await http_site.stop()
- await runner.shutdown()
+ await runner.cleanup()
await app.shutdown()
await app.cleanup()
self.logger.info("Streamserver exited.")
self.music = MusicController(self)
self.players = PlayerController(self, stream_port)
self._tracked_tasks: List[asyncio.Task] = []
+ self.closed = False
async def setup(self) -> None:
"""Async setup of music assistant."""
if not self.http_session:
self.http_session = aiohttp.ClientSession(
loop=self.loop,
- connector=aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False),
+ connector=aiohttp.TCPConnector(ssl=False),
)
# setup core controllers
await self.cache.setup()
async def stop(self) -> None:
"""Stop running the music assistant server."""
self.logger.info("Stop called, cleaning up...")
- # cancel any running tasks
+ # cancel all running tasks
for task in self._tracked_tasks:
task.cancel()
self.signal_event(EventType.SHUTDOWN)
- # wait for any remaining tasks launched by the shutdown event
- await asyncio.wait_for(asyncio.wait(self._tracked_tasks), 2)
+ self.closed = True
if self.http_session and not self.http_session_provided:
- await self.http_session.connector.close()
- self.http_session.detach()
+ await self.http_session.close()
def signal_event(self, event_type: EventType, event_details: Any = None) -> None:
"""
:param event_msg: the eventmessage to signal
:param event_details: optional details to send with the event.
"""
+ if self.closed:
+ return
for cb_func, event_filter in self._listeners:
if event_filter is None or event_type in event_filter:
self.create_task(cb_func, event_type, event_details)
"""
Create Task on (main) event loop from Callable or awaitable.
- Tasks create dby this helper will be properly cancelled on stop.
+ Tasks created by this helper will be properly cancelled on stop.
"""
+ if self.closed:
+ return
# Check for partials to properly determine if coroutine function
check_target = target
task = self.loop.create_task(executor_wrapper(target, *args, **kwargs))
def task_done_callback(*args, **kwargs):
- self.logger.debug("task finished %s", task.get_name())
self._tracked_tasks.remove(task)
self._tracked_tasks.append(task)
task.add_done_callback(task_done_callback)
- self.logger.debug("spawned task %s", task.get_name())
return task
async def __process_jobs(self):
"""Model for a music player."""
player_id: str
+ # mass object will be set by playermanager at register
+ mass: MusicAssistant = None # type: ignore[assignment]
_attr_is_group: bool = False
_attr_group_childs: List[str] = []
_attr_name: str = ""
_attr_max_sample_rate: int = 96000
_attr_active_queue_id: str = ""
_attr_use_multi_stream: bool = False
- # mass object will be set by playermanager at register
- mass: MusicAssistant = None # type: ignore[assignment]
+ _attr_group_parent: List[str] = [] # will be set by player manager
@property
def name(self) -> bool:
def update_state(self) -> None:
"""Update current player state in the player manager."""
+ if self.mass is None or self.mass.closed:
+ # guard
+ return
+ self._attr_group_childs = self.get_group_parents()
# determine active queue for player
queue_id = self.player_id
- for player_id in self.get_group_parents():
- if player := self.mass.players.get_player(player_id):
- if player.state in [PlayerState.PLAYING, PlayerState.PAUSED]:
- queue_id = player_id
- break
+ for state in [PlayerState.PLAYING, PlayerState.PAUSED]:
+ for player_id in self._attr_group_childs:
+ if player := self.mass.players.get_player(player_id):
+ if player.state == state:
+ queue_id = player_id
+ break
self._attr_active_queue_id = queue_id
# basic throttle: do not send state changed events if player did not change
prev_state = getattr(self, "_prev_state", None)
self.mass.create_task(player.update_state)
else:
# update group player when child updates
- for group_player_id in self.get_group_parents():
+ for group_player_id in self._attr_group_childs:
if player := self.mass.players.get_player(group_player_id):
self.mass.create_task(player.update_state)
"available": self.available,
"is_group": self.is_group,
"group_childs": self.group_childs,
+ "group_parents": self._attr_group_childs,
"volume_level": int(self.volume_level),
"device_info": self.device_info.to_dict(),
"active_queue": self.active_queue.queue_id,