improve support for grouped players (#247)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 6 Apr 2022 22:21:11 +0000 (00:21 +0200)
committerGitHub <noreply@github.com>
Wed, 6 Apr 2022 22:21:11 +0000 (00:21 +0200)
examples/full.py
music_assistant/controllers/music/__init__.py
music_assistant/controllers/stream.py
music_assistant/helpers/util.py
music_assistant/models/player.py
music_assistant/models/player_queue.py

index dba2ca64ef35a5fd89e8c708b03a71073e659fc7..a2e7a166796a20f9bce4cb087cb45c4aecd29d84 100644 (file)
@@ -89,10 +89,10 @@ if args.musicdir:
 class TestPlayer(Player):
     """Demonstatration player implementation."""
 
-    def __init__(self):
+    def __init__(self, player_id: str):
         """Init."""
-        self.player_id = "test"
-        self._attr_name = "Test player"
+        self.player_id = player_id
+        self._attr_name = player_id
         self._attr_powered = True
         self._attr_elapsed_time = 0
         self._attr_current_url = ""
@@ -163,13 +163,15 @@ async def main():
         playlists = await mass.music.playlists.library()
         print(f"Got {len(playlists)} playlists in library")
         # register a player
-        test_player = TestPlayer()
-        await mass.players.register_player(test_player)
+        test_player1 = TestPlayer("test1")
+        test_player2 = TestPlayer("test2")
+        await mass.players.register_player(test_player1)
+        await mass.players.register_player(test_player2)
         # try to play some playlist
-        await test_player.active_queue.set_crossfade_duration(10)
-        await test_player.active_queue.set_shuffle_enabled(True)
+        await test_player1.active_queue.set_crossfade_duration(10)
+        await test_player1.active_queue.set_shuffle_enabled(True)
         if len(playlists) > 0:
-            await test_player.active_queue.play_media(playlists[0].uri)
+            await test_player1.active_queue.play_media(playlists[0].uri)
 
         await asyncio.sleep(3600)
 
index 475e0df4e966520a8c2aed1f33b2197810d51f35..a757ed358a0ce89768da4fcede3783b643138549 100755 (executable)
@@ -403,6 +403,8 @@ class MusicController:
                 # sync playlist tracks
                 if media_type == MediaType.PLAYLIST:
                     await self._sync_playlist_tracks(db_item)
+                # chill a bit otherwise sync is really heavy for the system
+                await asyncio.sleep(0.1)
 
             # process deletions
             for item_id in prev_ids:
@@ -430,6 +432,8 @@ class MusicController:
                     album_track.disc_number,
                     album_track.track_number,
                 )
+                # chill a bit otherwise sync is really heavy for the system
+                await asyncio.sleep(0.1)
 
     async def _sync_playlist_tracks(self, db_playlist: Playlist) -> None:
         """Store playlist tracks of in-library playlist in database."""
@@ -451,6 +455,8 @@ class MusicController:
                     db_track.item_id,
                     playlist_track.position,
                 )
+                # chill a bit otherwise sync is really heavy for the system
+                await asyncio.sleep(0.1)
 
     def _get_controller(
         self, media_type: MediaType
index bc306fd6bc32f49441e71a8d6be3c31046bc29bf..741acc81aba3980a2a3b74d2e0c19e2614404d71 100644 (file)
@@ -40,10 +40,11 @@ class StreamController:
 
     def get_stream_url(self, queue_id: str) -> str:
         """Return the full stream url for the PlayerQueue Stream."""
+        checksum = str(int(time()))
         queue = self.mass.players.get_player_queue(queue_id)
         if queue.player.use_multi_stream:
-            return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav"
-        return f"http://{self._ip}:{self._port}/{queue_id}.flac"
+            return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav?checksum={checksum}"
+        return f"http://{self._ip}:{self._port}/{queue_id}.flac?checksum={checksum}"
 
     async def setup(self) -> None:
         """Async initialize of module."""
@@ -52,8 +53,10 @@ class StreamController:
         app.router.add_get(
             "/multi/{queue_id}.wav", self.serve_multi_client_queue_stream
         )
-        app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream)
-        app.router.add_get("/{queue_id}", self.serve_queue_stream)
+        app.router.add_get(
+            "/{queue_id}.{format}", self.serve_queue_stream, allow_head=True
+        )
+        app.router.add_get("/{queue_id}", self.serve_queue_stream, allow_head=True)
 
         runner = web.AppRunner(app, access_log=None)
         await runner.setup()
@@ -100,6 +103,8 @@ class StreamController:
             status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"}
         )
         await resp.prepare(request)
+        if request.method == "HEAD":
+            return
 
         start_streamdetails = await queue.queue_stream_prepare()
         output_fmt = ContentType(fmt)
@@ -154,6 +159,8 @@ class StreamController:
             },
         )
         await resp.prepare(request)
+        if request.method == "HEAD":
+            return resp
 
         # write wave header
         # multi subscriber queue is (currently) limited to 44100/16 format
@@ -201,6 +208,7 @@ class StreamController:
         # TODO: add timeout guard just in case we don't reach the number of expected client
         if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients:
             # start the stream task
+            await asyncio.sleep(1)  # relax
             await self.start_multi_queue_stream(queue_id)
 
         self.logger.debug(
@@ -260,6 +268,7 @@ class StreamController:
 
         if stream_task := self._stream_tasks.pop(queue_id, None):
             stream_task.cancel()
+            await stream_task
 
     async def _get_queue_stream(
         self,
index d49a8f3dd5807d2234cbf5edeb2f48377f2f3095..b45cdab03d1ea121317bf626f822cacb139ce626 100755 (executable)
@@ -229,7 +229,9 @@ def get_changed_keys(
     for key, value in dict2.items():
         if ignore_keys and key in ignore_keys:
             continue
-        if isinstance(value, dict):
+        if key not in dict1:
+            changed_keys.add(key)
+        elif isinstance(value, dict):
             changed_keys.update(get_changed_keys(dict1[key], value))
         elif dict1[key] != value:
             changed_keys.add(key)
index 2f6bba212c6862cc478a4689aa03f817a5623ed7..4ea0b08d24523510f9738703c73d12abbb3885da 100755 (executable)
@@ -4,11 +4,13 @@ from __future__ import annotations
 from abc import ABC
 from dataclasses import dataclass
 from enum import Enum, IntEnum
+from time import time
 from typing import TYPE_CHECKING, Any, Dict, List
 
 from mashumaro import DataClassDictMixin
 from music_assistant.constants import EventType
 from music_assistant.helpers.typing import MusicAssistant
+from music_assistant.helpers.util import get_changed_keys
 
 if TYPE_CHECKING:
     from .player_queue import PlayerQueue
@@ -44,8 +46,6 @@ class Player(ABC):
     """Model for a music player."""
 
     player_id: str
-    # mass object will be set by playermanager at register
-    mass: MusicAssistant = None  # type: ignore[assignment]
     _attr_is_group: bool = False
     _attr_group_childs: List[str] = []
     _attr_name: str = ""
@@ -59,7 +59,11 @@ class Player(ABC):
     _attr_max_sample_rate: int = 96000
     _attr_active_queue_id: str = ""
     _attr_use_multi_stream: bool = False
-    _attr_group_parent: List[str] = []  # will be set by player manager
+    # below objects will be set by playermanager at register/update
+    mass: MusicAssistant = None  # type: ignore[assignment]
+    _group_parents: List[str] = []  # will be set by player manager
+    _last_elapsed_time_received: float = 0
+    _prev_state: dict = {}
 
     @property
     def name(self) -> bool:
@@ -86,9 +90,15 @@ class Player(ABC):
         """Return elapsed time of current playing media in seconds."""
         return self._attr_elapsed_time
 
+    @property
+    def corrected_elapsed_time(self) -> int:
+        """Return corrected elapsed time of current playing media in seconds."""
+        return self._attr_elapsed_time + (time() - self._last_elapsed_time_received)
+
     @property
     def current_url(self) -> str:
         """Return URL that is currently loaded in the player."""
+        return self._attr_current_url
 
     @property
     def state(self) -> PlayerState:
@@ -186,43 +196,66 @@ class Player(ABC):
         """Toggle power on player."""
         await self.power(not self.powered)
 
+    def on_update_state(self) -> None:
+        """Call when player state is about to be updated in the player manager."""
+        # this is called from `update_state` to apply some additional custom logic
+
+    def on_child_update(self, player_id: str, changed_keys: set) -> None:
+        """Call when one of the child players of a playergroup updates."""
+        self.update_state(skip_forward=True)
+
+    def on_parent_update(self, player_id: str, changed_keys: set) -> None:
+        """Call when one the parent player of a grouped player updates."""
+        self.update_state(skip_forward=True)
+
     # DO NOT OVERRIDE BELOW
 
-    def update_state(self) -> None:
+    def update_state(self, skip_forward: bool = False) -> None:
         """Update current player state in the player manager."""
         if self.mass is None or self.mass.closed:
             # guard
             return
-        self._attr_group_childs = self.get_group_parents()
+        self.on_update_state()
+        self._group_parents = self._get_group_parents()
         # determine active queue for player
-        queue_id = self.player_id
-        for state in [PlayerState.PLAYING, PlayerState.PAUSED]:
-            for player_id in self._attr_group_childs:
-                if player := self.mass.players.get_player(player_id):
-                    if player.state == state:
-                        queue_id = player_id
-                        break
-        self._attr_active_queue_id = queue_id
+        self._attr_active_queue_id = self._get_active_queue_id()
         # basic throttle: do not send state changed events if player did not change
-        prev_state = getattr(self, "_prev_state", None)
         cur_state = self.to_dict()
-        if prev_state == cur_state:
+        changed_keys = get_changed_keys(self._prev_state, cur_state)
+
+        if "elapsed_time" in changed_keys:
+            self._last_elapsed_time_received = time()
+        elif "state" in changed_keys and self.state == PlayerState.PLAYING:
+            self._attr_elapsed_time = 0
+            self._last_elapsed_time_received = time()
+
+        # always update the playerqueue
+        self.mass.players.get_player_queue(self.player_id).on_player_update()
+
+        if len(changed_keys) == 0 or changed_keys == {"elapsed_time"}:
             return
-        setattr(self, "_prev_state", cur_state)
+
+        self._prev_state = cur_state
         self.mass.signal_event(EventType.PLAYER_CHANGED, self)
-        self.mass.players.get_player_queue(self.player_id).on_player_update()
+
+        if skip_forward:
+            return
         if self.is_group:
             # update group player childs when parent updates
             for child_player_id in self.group_childs:
                 if player := self.mass.players.get_player(child_player_id):
-                    self.mass.create_task(player.update_state)
-        else:
-            # update group player when child updates
-            for group_player_id in self._attr_group_childs:
-                if player := self.mass.players.get_player(group_player_id):
-                    self.mass.create_task(player.update_state)
-
-    def get_group_parents(self) -> List[str]:
+                    self.mass.create_task(
+                        player.on_parent_update, self.player_id, changed_keys
+                    )
+            return
+        # update group player when child updates
+        for group_player_id in self._group_parents:
+            if player := self.mass.players.get_player(group_player_id):
+                self.mass.create_task(
+                    player.on_child_update, self.player_id, changed_keys
+                )
+
+    def _get_group_parents(self) -> List[str]:
         """Get any/all group player id's this player belongs to."""
         return [
             x.player_id
@@ -230,6 +263,19 @@ class Player(ABC):
             if x.is_group and self.player_id in x.group_childs
         ]
 
+    def _get_active_queue_id(self) -> PlayerQueue:
+        """Return the currently active (playing) queue for this (grouped) player."""
+        for player_id in self._group_parents:
+            player = self.mass.players.get_player(player_id)
+            if not player or not player.powered:
+                continue
+            queue = self.mass.players.get_player_queue(player_id)
+            if not queue or not queue.active:
+                continue
+            # match found!
+            return queue.queue_id
+        return self.player_id
+
     def to_dict(self) -> Dict[str, Any]:
         """Export object to dict."""
         return {
@@ -241,7 +287,7 @@ class Player(ABC):
             "available": self.available,
             "is_group": self.is_group,
             "group_childs": self.group_childs,
-            "group_parents": self._attr_group_childs,
+            "group_parents": self._group_parents,
             "volume_level": int(self.volume_level),
             "device_info": self.device_info.to_dict(),
             "active_queue": self.active_queue.queue_id,
@@ -296,3 +342,15 @@ class PlayerGroup(Player):
             for x in self.mass.players
             if x.player_id in self.group_childs and x.powered or not only_powered
         ]
+
+    def on_child_update(self, player_id: str, changed_keys: set) -> None:
+        """Call when one of the child players of a playergroup updates."""
+        # convenience helper:
+        # power off group player if last child player turns off
+        powered_childs = set()
+        for child_id in self._attr_group_childs:
+            if player := self.mass.players.get_player(child_id):
+                if player.powered:
+                    powered_childs.add(child_id)
+        if self.powered and len(powered_childs) == 0:
+            self.mass.create_task(self.power(False))
index 5f45117f0881de59f14b0ebf8b7b4805e7147141..0a1748f79912c44045f502cbc1af0d9083006878 100644 (file)
@@ -103,9 +103,7 @@ class PlayerQueue:
     @property
     def active(self) -> bool:
         """Return bool if the queue is currenty active on the player."""
-        if self.player.current_url is None or self._stream_url is None:
-            return False
-        return self._stream_url in self.player.current_url
+        return self._stream_url == self.player.current_url
 
     @property
     def elapsed_time(self) -> int:
@@ -372,6 +370,9 @@ class PlayerQueue:
         self._next_start_index = index
 
         # send stream url to player connected to this queue
+        if self.player.use_multi_stream and self.player.state == PlayerState.PLAYING:
+            await self.player.stop()
+            await asyncio.sleep(1)
         self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id)
         await self.player.play_url(self._stream_url)
 
@@ -578,20 +579,11 @@ class PlayerQueue:
             self.update_state()
             await asyncio.sleep(1)
 
-    def __get_total_elapsed_time(self) -> int:
-        """Calculate the total elapsed time of the queue(player)."""
-        if self.player.state == PlayerState.PLAYING:
-            time_diff = time.time() - self._last_player_update
-            return int(self.player.elapsed_time + time_diff)
-        if self.player.state == PlayerState.PAUSED:
-            return self.player.elapsed_time
-        return 0
-
     def __get_queue_stream_index(self) -> Tuple[int, int]:
         """Calculate current queue index and current track elapsed time."""
         # player is playing a constant stream so we need to do this the hard way
         queue_index = 0
-        elapsed_time_queue = self.__get_total_elapsed_time()
+        elapsed_time_queue = self.player.corrected_elapsed_time
         total_time = 0
         track_time = 0
         if self._items and len(self._items) > self._start_index: