from abc import ABC
from dataclasses import dataclass
from enum import Enum, IntEnum
-from time import time
from typing import TYPE_CHECKING, Any, Dict, List
from mashumaro import DataClassDictMixin
# below objects will be set by playermanager at register/update
mass: MusicAssistant = None # type: ignore[assignment]
_attr_group_parents: List[str] = [] # will be set by player manager
- _last_elapsed_time_received: float = 0
_prev_state: dict = {}
@property
@property
def elapsed_time(self) -> float:
"""Return elapsed time of current playing media in seconds."""
+ # NOTE: Make sure to provide an accurate elapsed time otherwise the
+ # queue reporting of playing tracks will be wrong.
+ # this attribute will be checked every second when the queue is playing
return self._attr_elapsed_time
- @property
- def corrected_elapsed_time(self) -> float:
- """Return corrected elapsed time of current playing media in seconds."""
- return self.elapsed_time + (time() - self._last_elapsed_time_received)
-
@property
def current_url(self) -> str:
"""Return URL that is currently loaded in the player."""
# determine active queue for player
self._attr_active_queue_id = self._get_active_queue_id()
# basic throttle: do not send state changed events if player did not change
- cur_state = self._get_compare_dict()
+ cur_state = self.to_dict()
changed_keys = get_changed_keys(self._prev_state, cur_state)
- if "elapsed_time" in changed_keys:
- self._last_elapsed_time_received = time()
-
# always update the playerqueue
self.mass.players.get_player_queue(self.player_id).on_player_update()
if x.is_group and self.player_id in x.group_childs
]
- def _get_active_queue_id(self) -> PlayerQueue:
- """Return the currently active (playing) queue for this (grouped) player."""
+ def _get_active_queue_id(self) -> str:
+ """Return the currently active queue for this (grouped) player."""
for player_id in self._attr_group_parents:
player = self.mass.players.get_player(player_id)
if not player or not player.powered:
continue
- # if player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
- # continue
queue = self.mass.players.get_player_queue(player_id)
if not queue or not queue.active:
continue
return queue.queue_id
return self.player_id
- def _get_compare_dict(self) -> Dict[str, Any]:
- """Create dict for quick compare actions."""
- base = self.to_dict()
- base["elapsed_time"] = self.elapsed_time
- return base
-
def to_dict(self) -> Dict[str, Any]:
"""Export object to dict."""
return {
"player_id": self.player_id,
"name": self.name,
"powered": self.powered,
- "elapsed_time": self.corrected_elapsed_time,
+ "elapsed_time": self.elapsed_time,
"state": self.state.value,
"available": self.available,
"is_group": self.is_group,
return int(group_volume)
@property
- def corrected_elapsed_time(self) -> float:
+ def elapsed_time(self) -> float:
"""Return the corrected/precise elsapsed time of the grouped player."""
if not self.use_multi_stream:
- return super().corrected_elapsed_time
+ return super().elapsed_time
# calculate from group childs
for child_player in self._get_child_players(True):
if not child_player.current_url:
continue
if self.player_id not in child_player.current_url:
continue
- # if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
- # continue
- return child_player.corrected_elapsed_time
+ if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
+ continue
+ return child_player.elapsed_time
return 0
@property
self._volume_normalization_target: int = -23
self._current_index: Optional[int] = None
- self._current_item_time: int = 0
+ self._current_item_elapsed_time: int = 0
self._last_item: Optional[QueueItem] = None
self._start_index: int = 0 # from which index did the queue start playing
self._next_start_index: int = 0 # which index should the stream start
def elapsed_time(self) -> float:
"""Return elapsed time of current playing media in seconds."""
if not self.active:
- return self.player.corrected_elapsed_time
- return self._current_item_time
+ return self.player.elapsed_time
+ return self._current_item_elapsed_time
@property
def repeat_enabled(self) -> bool:
if self._repeat_enabled != enable_repeat:
self._repeat_enabled = enable_repeat
self.mass.signal_event(EventType.QUEUE_UPDATED, self)
- await self._save_state()
+ await self._save_state(False)
async def set_crossfade_duration(self, duration: int) -> None:
"""Set the crossfade duration for this queue, 0 to disable."""
if self._crossfade_duration != duration:
self._crossfade_duration = duration
self.mass.signal_event(EventType.QUEUE_UPDATED, self)
- await self._save_state()
+ await self._save_state(False)
async def set_volume_normalization_enabled(self, enable: bool) -> None:
"""Set volume normalization."""
if self._repeat_enabled != enable:
self._repeat_enabled = enable
self.mass.signal_event(EventType.QUEUE_UPDATED, self)
- await self._save_state()
+ await self._save_state(False)
async def set_volume_normalization_target(self, target: int) -> None:
"""Set the target for the volume normalization in LUFS (default is -23)."""
if self._volume_normalization_target != target:
self._volume_normalization_target = target
self.mass.signal_event(EventType.QUEUE_UPDATED, self)
- await self._save_state()
+ await self._save_state(False)
async def stop(self) -> None:
"""Stop command on queue player."""
if self.player.state != PlayerState.PLAYING and self._signal_next:
self._signal_next = False
self.mass.create_task(self.resume())
- # start updater task if needed
- if self.player.state == PlayerState.PLAYING:
- if not self._update_task:
- self._update_task = self.mass.create_task(self.__update_task())
+
+ # start poll/updater task if playback starts on player
+ async def updater() -> None:
+ """Update player queue every second while playing."""
+ while True:
+ await asyncio.sleep(1)
+ self.update_state()
+
+ if self.player.state == PlayerState.PLAYING and self.active:
+ if not self._update_task or self._update_task.done():
+ self._update_task = self.mass.create_task(updater)
elif self._update_task:
self._update_task.cancel()
self._update_task = None
if self.player.active_queue.queue_id != self.queue_id:
return
new_index = self._current_index
- track_time = self._current_item_time
+ track_time = self._current_item_elapsed_time
new_item_loaded = False
# if self.player.state == PlayerState.PLAYING:
- if (
- self.player.state == PlayerState.PLAYING
- and self.player.corrected_elapsed_time > 0
- ):
+ if self.player.state == PlayerState.PLAYING and self.player.elapsed_time > 0:
new_index, track_time = self.__get_queue_stream_index()
# process new index
if self._current_index != new_index:
self._last_item.streamdetails = None
self._last_item = self.current_item
# update vars and signal update on eventbus if needed
- prev_item_time = int(self._current_item_time)
- self._current_item_time = int(track_time)
+ prev_item_time = int(self._current_item_elapsed_time)
+ self._current_item_elapsed_time = int(track_time)
+
if new_item_loaded:
self.mass.create_task(self._save_state())
- if new_item_loaded or abs(prev_item_time - self._current_item_time) >= 1:
+ if (
+ new_item_loaded
+ or abs(prev_item_time - self._current_item_elapsed_time) >= 1
+ ):
self.mass.signal_event(EventType.QUEUE_UPDATED, self)
return True
return False
async def queue_stream_start(self) -> int:
"""Call when queue_streamer starts playing the queue stream."""
start_from_index = self._next_start_index
- self._current_item_time = 0
+ self._current_item_elapsed_time = 0
self._current_index = start_from_index
self._start_index = start_from_index
self._next_start_index = self.get_next_index(start_from_index)
"""Indicate that queue stream needs to start next index once playback finished."""
self._signal_next = True
- async def __update_task(self) -> None:
- """Update player queue every second while playing."""
- while True:
- await asyncio.sleep(1)
- self.update_state()
-
def __get_queue_stream_index(self) -> Tuple[int, int]:
"""Calculate current queue index and current track elapsed time."""
# player is playing a constant stream so we need to do this the hard way
queue_index = 0
- elapsed_time_queue = self.player.corrected_elapsed_time
+ elapsed_time_queue = self.player.elapsed_time
total_time = 0
track_time = 0
if self._items and len(self._items) > self._start_index:
self._items = queue_cache["items"]
self._current_index = queue_cache["current_index"]
- async def _save_state(self) -> None:
+ async def _save_state(self, save_items: bool = True) -> None:
"""Save state in database."""
# save queue settings in db
await self.mass.database.insert_or_replace(
)
# store current items in cache
- async def cache_items():
+ if save_items:
await self.mass.cache.set(
f"queue_items.{self.queue_id}",
{"items": self._items, "current_index": self._current_index},
)
-
- if self._save_task and not self._save_task.cancelled():
- return
- self._save_task = self.mass.loop.call_later(
- 60, self.mass.create_task, cache_items
- )