class TestPlayer(Player):
"""Demonstatration player implementation."""
- def __init__(self):
+ def __init__(self, player_id: str):
"""Init."""
- self.player_id = "test"
- self._attr_name = "Test player"
+ self.player_id = player_id
+ self._attr_name = player_id
self._attr_powered = True
self._attr_elapsed_time = 0
self._attr_current_url = ""
playlists = await mass.music.playlists.library()
print(f"Got {len(playlists)} playlists in library")
# register a player
- test_player = TestPlayer()
- await mass.players.register_player(test_player)
+ test_player1 = TestPlayer("test1")
+ test_player2 = TestPlayer("test2")
+ await mass.players.register_player(test_player1)
+ await mass.players.register_player(test_player2)
# try to play some playlist
- await test_player.active_queue.set_crossfade_duration(10)
- await test_player.active_queue.set_shuffle_enabled(True)
+ await test_player1.active_queue.set_crossfade_duration(10)
+ await test_player1.active_queue.set_shuffle_enabled(True)
if len(playlists) > 0:
- await test_player.active_queue.play_media(playlists[0].uri)
+ await test_player1.active_queue.play_media(playlists[0].uri)
await asyncio.sleep(3600)
def get_stream_url(self, queue_id: str) -> str:
"""Return the full stream url for the PlayerQueue Stream."""
+ checksum = str(int(time()))
queue = self.mass.players.get_player_queue(queue_id)
if queue.player.use_multi_stream:
- return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav"
- return f"http://{self._ip}:{self._port}/{queue_id}.flac"
+ return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav?checksum={checksum}"
+ return f"http://{self._ip}:{self._port}/{queue_id}.flac?checksum={checksum}"
async def setup(self) -> None:
"""Async initialize of module."""
app.router.add_get(
"/multi/{queue_id}.wav", self.serve_multi_client_queue_stream
)
- app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream)
- app.router.add_get("/{queue_id}", self.serve_queue_stream)
+ app.router.add_get(
+ "/{queue_id}.{format}", self.serve_queue_stream, allow_head=True
+ )
+ app.router.add_get("/{queue_id}", self.serve_queue_stream, allow_head=True)
runner = web.AppRunner(app, access_log=None)
await runner.setup()
status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"}
)
await resp.prepare(request)
+ if request.method == "HEAD":
+ return
start_streamdetails = await queue.queue_stream_prepare()
output_fmt = ContentType(fmt)
},
)
await resp.prepare(request)
+ if request.method == "HEAD":
+ return resp
# write wave header
# multi subscriber queue is (currently) limited to 44100/16 format
# TODO: add timeout guard just in case we don't reach the number of expected client
if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients:
# start the stream task
+ await asyncio.sleep(1) # relax
await self.start_multi_queue_stream(queue_id)
self.logger.debug(
if stream_task := self._stream_tasks.pop(queue_id, None):
stream_task.cancel()
+ await stream_task
async def _get_queue_stream(
self,
from abc import ABC
from dataclasses import dataclass
from enum import Enum, IntEnum
+from time import time
from typing import TYPE_CHECKING, Any, Dict, List
from mashumaro import DataClassDictMixin
from music_assistant.constants import EventType
from music_assistant.helpers.typing import MusicAssistant
+from music_assistant.helpers.util import get_changed_keys
if TYPE_CHECKING:
from .player_queue import PlayerQueue
"""Model for a music player."""
player_id: str
- # mass object will be set by playermanager at register
- mass: MusicAssistant = None # type: ignore[assignment]
_attr_is_group: bool = False
_attr_group_childs: List[str] = []
_attr_name: str = ""
_attr_max_sample_rate: int = 96000
_attr_active_queue_id: str = ""
_attr_use_multi_stream: bool = False
- _attr_group_parent: List[str] = [] # will be set by player manager
+ # below objects will be set by playermanager at register/update
+ mass: MusicAssistant = None # type: ignore[assignment]
+ _group_parents: List[str] = [] # will be set by player manager
+ _last_elapsed_time_received: float = 0
+ _prev_state: dict = {}
@property
def name(self) -> bool:
"""Return elapsed time of current playing media in seconds."""
return self._attr_elapsed_time
+ @property
+ def corrected_elapsed_time(self) -> int:
+ """Return corrected elapsed time of current playing media in seconds."""
+ return self._attr_elapsed_time + (time() - self._last_elapsed_time_received)
+
@property
def current_url(self) -> str:
"""Return URL that is currently loaded in the player."""
+ return self._attr_current_url
@property
def state(self) -> PlayerState:
"""Toggle power on player."""
await self.power(not self.powered)
+ def on_update_state(self) -> None:
+ """Call when player state is about to be updated in the player manager."""
+ # this is called from `update_state` to apply some additional custom logic
+
+ def on_child_update(self, player_id: str, changed_keys: set) -> None:
+ """Call when one of the child players of a playergroup updates."""
+ self.update_state(skip_forward=True)
+
+ def on_parent_update(self, player_id: str, changed_keys: set) -> None:
+ """Call when one the parent player of a grouped player updates."""
+ self.update_state(skip_forward=True)
+
# DO NOT OVERRIDE BELOW
- def update_state(self) -> None:
+ def update_state(self, skip_forward: bool = False) -> None:
"""Update current player state in the player manager."""
if self.mass is None or self.mass.closed:
# guard
return
- self._attr_group_childs = self.get_group_parents()
+ self.on_update_state()
+ self._group_parents = self._get_group_parents()
# determine active queue for player
- queue_id = self.player_id
- for state in [PlayerState.PLAYING, PlayerState.PAUSED]:
- for player_id in self._attr_group_childs:
- if player := self.mass.players.get_player(player_id):
- if player.state == state:
- queue_id = player_id
- break
- self._attr_active_queue_id = queue_id
+ self._attr_active_queue_id = self._get_active_queue_id()
# basic throttle: do not send state changed events if player did not change
- prev_state = getattr(self, "_prev_state", None)
cur_state = self.to_dict()
- if prev_state == cur_state:
+ changed_keys = get_changed_keys(self._prev_state, cur_state)
+
+ if "elapsed_time" in changed_keys:
+ self._last_elapsed_time_received = time()
+ elif "state" in changed_keys and self.state == PlayerState.PLAYING:
+ self._attr_elapsed_time = 0
+ self._last_elapsed_time_received = time()
+
+ # always update the playerqueue
+ self.mass.players.get_player_queue(self.player_id).on_player_update()
+
+ if len(changed_keys) == 0 or changed_keys == {"elapsed_time"}:
return
- setattr(self, "_prev_state", cur_state)
+
+ self._prev_state = cur_state
self.mass.signal_event(EventType.PLAYER_CHANGED, self)
- self.mass.players.get_player_queue(self.player_id).on_player_update()
+
+ if skip_forward:
+ return
if self.is_group:
# update group player childs when parent updates
for child_player_id in self.group_childs:
if player := self.mass.players.get_player(child_player_id):
- self.mass.create_task(player.update_state)
- else:
- # update group player when child updates
- for group_player_id in self._attr_group_childs:
- if player := self.mass.players.get_player(group_player_id):
- self.mass.create_task(player.update_state)
-
- def get_group_parents(self) -> List[str]:
+ self.mass.create_task(
+ player.on_parent_update, self.player_id, changed_keys
+ )
+ return
+ # update group player when child updates
+ for group_player_id in self._group_parents:
+ if player := self.mass.players.get_player(group_player_id):
+ self.mass.create_task(
+ player.on_child_update, self.player_id, changed_keys
+ )
+
+ def _get_group_parents(self) -> List[str]:
"""Get any/all group player id's this player belongs to."""
return [
x.player_id
if x.is_group and self.player_id in x.group_childs
]
+ def _get_active_queue_id(self) -> PlayerQueue:
+ """Return the currently active (playing) queue for this (grouped) player."""
+ for player_id in self._group_parents:
+ player = self.mass.players.get_player(player_id)
+ if not player or not player.powered:
+ continue
+ queue = self.mass.players.get_player_queue(player_id)
+ if not queue or not queue.active:
+ continue
+ # match found!
+ return queue.queue_id
+ return self.player_id
+
def to_dict(self) -> Dict[str, Any]:
"""Export object to dict."""
return {
"available": self.available,
"is_group": self.is_group,
"group_childs": self.group_childs,
- "group_parents": self._attr_group_childs,
+ "group_parents": self._group_parents,
"volume_level": int(self.volume_level),
"device_info": self.device_info.to_dict(),
"active_queue": self.active_queue.queue_id,
for x in self.mass.players
if x.player_id in self.group_childs and x.powered or not only_powered
]
+
+ def on_child_update(self, player_id: str, changed_keys: set) -> None:
+ """Call when one of the child players of a playergroup updates."""
+ # convenience helper:
+ # power off group player if last child player turns off
+ powered_childs = set()
+ for child_id in self._attr_group_childs:
+ if player := self.mass.players.get_player(child_id):
+ if player.powered:
+ powered_childs.add(child_id)
+ if self.powered and len(powered_childs) == 0:
+ self.mass.create_task(self.power(False))
@property
def active(self) -> bool:
"""Return bool if the queue is currenty active on the player."""
- if self.player.current_url is None or self._stream_url is None:
- return False
- return self._stream_url in self.player.current_url
+ return self._stream_url == self.player.current_url
@property
def elapsed_time(self) -> int:
self._next_start_index = index
# send stream url to player connected to this queue
+ if self.player.use_multi_stream and self.player.state == PlayerState.PLAYING:
+ await self.player.stop()
+ await asyncio.sleep(1)
self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id)
await self.player.play_url(self._stream_url)
self.update_state()
await asyncio.sleep(1)
- def __get_total_elapsed_time(self) -> int:
- """Calculate the total elapsed time of the queue(player)."""
- if self.player.state == PlayerState.PLAYING:
- time_diff = time.time() - self._last_player_update
- return int(self.player.elapsed_time + time_diff)
- if self.player.state == PlayerState.PAUSED:
- return self.player.elapsed_time
- return 0
-
def __get_queue_stream_index(self) -> Tuple[int, int]:
"""Calculate current queue index and current track elapsed time."""
# player is playing a constant stream so we need to do this the hard way
queue_index = 0
- elapsed_time_queue = self.__get_total_elapsed_time()
+ elapsed_time_queue = self.player.corrected_elapsed_time
total_time = 0
track_time = 0
if self._items and len(self._items) > self._start_index: