From 59ace1ee41c1edbe9e2bd711607ee69793c35a03 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 7 Apr 2022 00:21:11 +0200 Subject: [PATCH] improve support for grouped players (#247) --- examples/full.py | 18 +-- music_assistant/controllers/music/__init__.py | 6 + music_assistant/controllers/stream.py | 17 ++- music_assistant/helpers/util.py | 4 +- music_assistant/models/player.py | 110 +++++++++++++----- music_assistant/models/player_queue.py | 18 +-- 6 files changed, 121 insertions(+), 52 deletions(-) diff --git a/examples/full.py b/examples/full.py index dba2ca64..a2e7a166 100644 --- a/examples/full.py +++ b/examples/full.py @@ -89,10 +89,10 @@ if args.musicdir: class TestPlayer(Player): """Demonstatration player implementation.""" - def __init__(self): + def __init__(self, player_id: str): """Init.""" - self.player_id = "test" - self._attr_name = "Test player" + self.player_id = player_id + self._attr_name = player_id self._attr_powered = True self._attr_elapsed_time = 0 self._attr_current_url = "" @@ -163,13 +163,15 @@ async def main(): playlists = await mass.music.playlists.library() print(f"Got {len(playlists)} playlists in library") # register a player - test_player = TestPlayer() - await mass.players.register_player(test_player) + test_player1 = TestPlayer("test1") + test_player2 = TestPlayer("test2") + await mass.players.register_player(test_player1) + await mass.players.register_player(test_player2) # try to play some playlist - await test_player.active_queue.set_crossfade_duration(10) - await test_player.active_queue.set_shuffle_enabled(True) + await test_player1.active_queue.set_crossfade_duration(10) + await test_player1.active_queue.set_shuffle_enabled(True) if len(playlists) > 0: - await test_player.active_queue.play_media(playlists[0].uri) + await test_player1.active_queue.play_media(playlists[0].uri) await asyncio.sleep(3600) diff --git a/music_assistant/controllers/music/__init__.py b/music_assistant/controllers/music/__init__.py index 475e0df4..a757ed35 100755 --- a/music_assistant/controllers/music/__init__.py +++ b/music_assistant/controllers/music/__init__.py @@ -403,6 +403,8 @@ class MusicController: # sync playlist tracks if media_type == MediaType.PLAYLIST: await self._sync_playlist_tracks(db_item) + # chill a bit otherwise sync is really heavy for the system + await asyncio.sleep(0.1) # process deletions for item_id in prev_ids: @@ -430,6 +432,8 @@ class MusicController: album_track.disc_number, album_track.track_number, ) + # chill a bit otherwise sync is really heavy for the system + await asyncio.sleep(0.1) async def _sync_playlist_tracks(self, db_playlist: Playlist) -> None: """Store playlist tracks of in-library playlist in database.""" @@ -451,6 +455,8 @@ class MusicController: db_track.item_id, playlist_track.position, ) + # chill a bit otherwise sync is really heavy for the system + await asyncio.sleep(0.1) def _get_controller( self, media_type: MediaType diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index bc306fd6..741acc81 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -40,10 +40,11 @@ class StreamController: def get_stream_url(self, queue_id: str) -> str: """Return the full stream url for the PlayerQueue Stream.""" + checksum = str(int(time())) queue = self.mass.players.get_player_queue(queue_id) if queue.player.use_multi_stream: - return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav" - return f"http://{self._ip}:{self._port}/{queue_id}.flac" + return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav?checksum={checksum}" + return f"http://{self._ip}:{self._port}/{queue_id}.flac?checksum={checksum}" async def setup(self) -> None: """Async initialize of module.""" @@ -52,8 +53,10 @@ class StreamController: app.router.add_get( "/multi/{queue_id}.wav", self.serve_multi_client_queue_stream ) - app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream) - app.router.add_get("/{queue_id}", self.serve_queue_stream) + app.router.add_get( + "/{queue_id}.{format}", self.serve_queue_stream, allow_head=True + ) + app.router.add_get("/{queue_id}", self.serve_queue_stream, allow_head=True) runner = web.AppRunner(app, access_log=None) await runner.setup() @@ -100,6 +103,8 @@ class StreamController: status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"} ) await resp.prepare(request) + if request.method == "HEAD": + return start_streamdetails = await queue.queue_stream_prepare() output_fmt = ContentType(fmt) @@ -154,6 +159,8 @@ class StreamController: }, ) await resp.prepare(request) + if request.method == "HEAD": + return resp # write wave header # multi subscriber queue is (currently) limited to 44100/16 format @@ -201,6 +208,7 @@ class StreamController: # TODO: add timeout guard just in case we don't reach the number of expected client if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients: # start the stream task + await asyncio.sleep(1) # relax await self.start_multi_queue_stream(queue_id) self.logger.debug( @@ -260,6 +268,7 @@ class StreamController: if stream_task := self._stream_tasks.pop(queue_id, None): stream_task.cancel() + await stream_task async def _get_queue_stream( self, diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index d49a8f3d..b45cdab0 100755 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -229,7 +229,9 @@ def get_changed_keys( for key, value in dict2.items(): if ignore_keys and key in ignore_keys: continue - if isinstance(value, dict): + if key not in dict1: + changed_keys.add(key) + elif isinstance(value, dict): changed_keys.update(get_changed_keys(dict1[key], value)) elif dict1[key] != value: changed_keys.add(key) diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 2f6bba21..4ea0b08d 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -4,11 +4,13 @@ from __future__ import annotations from abc import ABC from dataclasses import dataclass from enum import Enum, IntEnum +from time import time from typing import TYPE_CHECKING, Any, Dict, List from mashumaro import DataClassDictMixin from music_assistant.constants import EventType from music_assistant.helpers.typing import MusicAssistant +from music_assistant.helpers.util import get_changed_keys if TYPE_CHECKING: from .player_queue import PlayerQueue @@ -44,8 +46,6 @@ class Player(ABC): """Model for a music player.""" player_id: str - # mass object will be set by playermanager at register - mass: MusicAssistant = None # type: ignore[assignment] _attr_is_group: bool = False _attr_group_childs: List[str] = [] _attr_name: str = "" @@ -59,7 +59,11 @@ class Player(ABC): _attr_max_sample_rate: int = 96000 _attr_active_queue_id: str = "" _attr_use_multi_stream: bool = False - _attr_group_parent: List[str] = [] # will be set by player manager + # below objects will be set by playermanager at register/update + mass: MusicAssistant = None # type: ignore[assignment] + _group_parents: List[str] = [] # will be set by player manager + _last_elapsed_time_received: float = 0 + _prev_state: dict = {} @property def name(self) -> bool: @@ -86,9 +90,15 @@ class Player(ABC): """Return elapsed time of current playing media in seconds.""" return self._attr_elapsed_time + @property + def corrected_elapsed_time(self) -> int: + """Return corrected elapsed time of current playing media in seconds.""" + return self._attr_elapsed_time + (time() - self._last_elapsed_time_received) + @property def current_url(self) -> str: """Return URL that is currently loaded in the player.""" + return self._attr_current_url @property def state(self) -> PlayerState: @@ -186,43 +196,66 @@ class Player(ABC): """Toggle power on player.""" await self.power(not self.powered) + def on_update_state(self) -> None: + """Call when player state is about to be updated in the player manager.""" + # this is called from `update_state` to apply some additional custom logic + + def on_child_update(self, player_id: str, changed_keys: set) -> None: + """Call when one of the child players of a playergroup updates.""" + self.update_state(skip_forward=True) + + def on_parent_update(self, player_id: str, changed_keys: set) -> None: + """Call when one the parent player of a grouped player updates.""" + self.update_state(skip_forward=True) + # DO NOT OVERRIDE BELOW - def update_state(self) -> None: + def update_state(self, skip_forward: bool = False) -> None: """Update current player state in the player manager.""" if self.mass is None or self.mass.closed: # guard return - self._attr_group_childs = self.get_group_parents() + self.on_update_state() + self._group_parents = self._get_group_parents() # determine active queue for player - queue_id = self.player_id - for state in [PlayerState.PLAYING, PlayerState.PAUSED]: - for player_id in self._attr_group_childs: - if player := self.mass.players.get_player(player_id): - if player.state == state: - queue_id = player_id - break - self._attr_active_queue_id = queue_id + self._attr_active_queue_id = self._get_active_queue_id() # basic throttle: do not send state changed events if player did not change - prev_state = getattr(self, "_prev_state", None) cur_state = self.to_dict() - if prev_state == cur_state: + changed_keys = get_changed_keys(self._prev_state, cur_state) + + if "elapsed_time" in changed_keys: + self._last_elapsed_time_received = time() + elif "state" in changed_keys and self.state == PlayerState.PLAYING: + self._attr_elapsed_time = 0 + self._last_elapsed_time_received = time() + + # always update the playerqueue + self.mass.players.get_player_queue(self.player_id).on_player_update() + + if len(changed_keys) == 0 or changed_keys == {"elapsed_time"}: return - setattr(self, "_prev_state", cur_state) + + self._prev_state = cur_state self.mass.signal_event(EventType.PLAYER_CHANGED, self) - self.mass.players.get_player_queue(self.player_id).on_player_update() + + if skip_forward: + return if self.is_group: # update group player childs when parent updates for child_player_id in self.group_childs: if player := self.mass.players.get_player(child_player_id): - self.mass.create_task(player.update_state) - else: - # update group player when child updates - for group_player_id in self._attr_group_childs: - if player := self.mass.players.get_player(group_player_id): - self.mass.create_task(player.update_state) - - def get_group_parents(self) -> List[str]: + self.mass.create_task( + player.on_parent_update, self.player_id, changed_keys + ) + return + # update group player when child updates + for group_player_id in self._group_parents: + if player := self.mass.players.get_player(group_player_id): + self.mass.create_task( + player.on_child_update, self.player_id, changed_keys + ) + + def _get_group_parents(self) -> List[str]: """Get any/all group player id's this player belongs to.""" return [ x.player_id @@ -230,6 +263,19 @@ class Player(ABC): if x.is_group and self.player_id in x.group_childs ] + def _get_active_queue_id(self) -> PlayerQueue: + """Return the currently active (playing) queue for this (grouped) player.""" + for player_id in self._group_parents: + player = self.mass.players.get_player(player_id) + if not player or not player.powered: + continue + queue = self.mass.players.get_player_queue(player_id) + if not queue or not queue.active: + continue + # match found! + return queue.queue_id + return self.player_id + def to_dict(self) -> Dict[str, Any]: """Export object to dict.""" return { @@ -241,7 +287,7 @@ class Player(ABC): "available": self.available, "is_group": self.is_group, "group_childs": self.group_childs, - "group_parents": self._attr_group_childs, + "group_parents": self._group_parents, "volume_level": int(self.volume_level), "device_info": self.device_info.to_dict(), "active_queue": self.active_queue.queue_id, @@ -296,3 +342,15 @@ class PlayerGroup(Player): for x in self.mass.players if x.player_id in self.group_childs and x.powered or not only_powered ] + + def on_child_update(self, player_id: str, changed_keys: set) -> None: + """Call when one of the child players of a playergroup updates.""" + # convenience helper: + # power off group player if last child player turns off + powered_childs = set() + for child_id in self._attr_group_childs: + if player := self.mass.players.get_player(child_id): + if player.powered: + powered_childs.add(child_id) + if self.powered and len(powered_childs) == 0: + self.mass.create_task(self.power(False)) diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 5f45117f..0a1748f7 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -103,9 +103,7 @@ class PlayerQueue: @property def active(self) -> bool: """Return bool if the queue is currenty active on the player.""" - if self.player.current_url is None or self._stream_url is None: - return False - return self._stream_url in self.player.current_url + return self._stream_url == self.player.current_url @property def elapsed_time(self) -> int: @@ -372,6 +370,9 @@ class PlayerQueue: self._next_start_index = index # send stream url to player connected to this queue + if self.player.use_multi_stream and self.player.state == PlayerState.PLAYING: + await self.player.stop() + await asyncio.sleep(1) self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id) await self.player.play_url(self._stream_url) @@ -578,20 +579,11 @@ class PlayerQueue: self.update_state() await asyncio.sleep(1) - def __get_total_elapsed_time(self) -> int: - """Calculate the total elapsed time of the queue(player).""" - if self.player.state == PlayerState.PLAYING: - time_diff = time.time() - self._last_player_update - return int(self.player.elapsed_time + time_diff) - if self.player.state == PlayerState.PAUSED: - return self.player.elapsed_time - return 0 - def __get_queue_stream_index(self) -> Tuple[int, int]: """Calculate current queue index and current track elapsed time.""" # player is playing a constant stream so we need to do this the hard way queue_index = 0 - elapsed_time_queue = self.__get_total_elapsed_time() + elapsed_time_queue = self.player.corrected_elapsed_time total_time = 0 track_time = 0 if self._items and len(self._items) > self._start_index: -- 2.34.1