From: Marcel van der Veldt Date: Fri, 8 Apr 2022 13:16:09 +0000 (+0200) Subject: use float for elapsed time for more accurate precision X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=c282aab49081fc9d1086128432e9d737ac110232;p=music-assistant-server.git use float for elapsed time for more accurate precision --- diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index c3aea3e2..a3bac42c 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -19,7 +19,7 @@ from music_assistant.helpers.audio import ( from music_assistant.helpers.process import AsyncProcess from music_assistant.helpers.typing import MusicAssistant from music_assistant.helpers.util import get_ip -from music_assistant.models.errors import MediaNotFoundError +from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError from music_assistant.models.media_items import ContentType from music_assistant.models.player_queue import PlayerQueue @@ -161,18 +161,24 @@ class StreamController: await self.subscribe_client(queue_id, player_id) try: while True: - audio_chunk = await self._client_queues[queue_id][player_id].get() - await resp.write(audio_chunk) - self._client_queues[queue_id][player_id].task_done() + client_queue = self._client_queues.get(queue_id).get(player_id) + if not client_queue: + break + audio_chunk = await client_queue.get() if audio_chunk == b"": - # last chunk + # eof break + await resp.write(audio_chunk) + client_queue.task_done() finally: await self.unsubscribe_client(queue_id, player_id) return resp async def subscribe_client(self, queue_id: str, player_id: str) -> None: """Subscribe client to queue stream.""" + if queue_id not in self._stream_tasks: + raise MusicAssistantError(f"No Queue stream available for {queue_id}") + if queue_id not in self._subscribers: self._subscribers[queue_id] = set() self._subscribers[queue_id].add(player_id) @@ -188,7 +194,7 @@ class StreamController: if player_id in self._subscribers[queue_id]: self._subscribers[queue_id].remove(player_id) - self._client_queues.get(queue_id, {}).pop(player_id, None) + self.__cleanup_client_queue(queue_id, player_id) self.logger.debug( "Unsubscribed player %s from multi queue stream %s", player_id, queue_id ) @@ -217,25 +223,6 @@ class StreamController: self.__multi_client_queue_stream_runner(queue_id, output_fmt) ) - async def unpause_synced_start(): - # unpause if we got all clients (or timeout) for (more or less) synced start - expected_clients = len(self._client_queues[queue_id]) - time_started = self._time_started[queue_id] - while True: - await asyncio.sleep(0.1) - if (len(self._subscribers) >= expected_clients) or ( - time() - time_started - ) > 5: - await asyncio.gather( - *[ - self.mass.players.get_player(client_id).play() - for client_id in self._subscribers.get(queue_id, {}) - ] - ) - break - - self.mass.create_task(unpause_synced_start) - async def stop_multi_client_queue_stream(self, queue_id: str) -> None: """Signal a running queue stream task and its listeners to stop.""" if queue_id not in self._stream_tasks: @@ -265,14 +252,6 @@ class StreamController: """Distribute audio chunks over connected clients in a multi client queue stream.""" queue = self.mass.players.get_player_queue(queue_id) - def cleanup_client_queue(player_id: str): - if client_queue := self._client_queues[queue_id].get(player_id, None): - self.logger.debug("cleaning up child queue %s", player_id) - for _ in range(client_queue.qsize()): - client_queue.get_nowait() - client_queue.task_done() - client_queue.put_nowait(b"") - start_streamdetails = await queue.queue_stream_prepare() sox_args = await get_sox_args_for_pcm_stream( start_streamdetails.sample_rate, @@ -288,7 +267,7 @@ class StreamController: async with AsyncProcess(sox_args, True) as sox_proc: async def writer(): - # task that sends the raw pcm audio to the sox/ffmpeg process + """Task that sends the raw pcm audio to the sox/ffmpeg process.""" async for audio_chunk in self._get_queue_stream( queue, sample_rate=start_streamdetails.sample_rate, @@ -302,19 +281,19 @@ class StreamController: sox_proc.write_eof() async def reader(): - # read bytes from final output + """Read bytes from final output and put chunk on child queues.""" chunks_sent = 0 - chunksize = 32000 if output_fmt == ContentType.MP3 else 90000 - async for chunk in sox_proc.iterate_chunks(chunksize): + async for chunk in sox_proc.iterate_chunks(256000): chunks_sent += 1 coros = [] for player_id in list(self._client_queues[queue_id].keys()): if ( - chunks_sent >= 20 + self._client_queues[queue_id][player_id].full() + and chunks_sent >= 10 and player_id not in self._subscribers[queue_id] ): - # assume client did not connect or got disconnected somehow - cleanup_client_queue(player_id) + # assume client did not connect at all or got disconnected somehow + self.__cleanup_client_queue(queue_id, player_id) self._client_queues[queue_id].pop(player_id, None) else: coros.append( @@ -322,20 +301,34 @@ class StreamController: ) await asyncio.gather(*coros) + # launch the reader and writer await asyncio.gather(*[writer(), reader()]) - # send empty chunk to inform EOF - await asyncio.gather( - *[cq.put(b"") for cq in self._client_queues[queue_id].values()] - ) + # wait for all queues to consume their data + await asyncio.gather( + *[cq.join() for cq in self._client_queues[queue_id].values()] + ) + # send empty chunk to inform EOF + await asyncio.gather( + *[cq.put(b"") for cq in self._client_queues[queue_id].values()] + ) finally: + self.logger.debug("Multi client queue stream %s finished", queue.queue_id) # cleanup self._stream_tasks.pop(queue_id, None) for player_id in list(self._client_queues[queue_id].keys()): - cleanup_client_queue(player_id) + self.__cleanup_client_queue(queue_id, player_id) self.logger.debug("Multi client queue stream %s ended", queue.queue_id) + def __cleanup_client_queue(self, queue_id: str, player_id: str): + """Cleanup a client queue after it completes/disconnects.""" + if client_queue := self._client_queues.get(queue_id, {}).pop(player_id, None): + for _ in range(client_queue.qsize()): + client_queue.get_nowait() + client_queue.task_done() + client_queue.put_nowait(b"") + async def _get_queue_stream( self, queue: PlayerQueue, @@ -348,7 +341,6 @@ class StreamController: last_fadeout_data = b"" queue_index = None track_count = 0 - start_timestamp = time() pcm_fmt = ContentType.from_bit_depth(bit_depth) self.logger.info( @@ -360,6 +352,7 @@ class StreamController: # stream queue tracks one by one while True: + start_timestamp = time() # get the (next) track in queue track_count += 1 if track_count == 1: @@ -519,11 +512,12 @@ class StreamController: else: prev_chunk = chunk del chunk - # guard for clients buffering too much + # allow clients to only buffer max ~15 seconds ahead seconds_streamed = bytes_written / sample_size - seconds_needed = int(time() - start_timestamp) - if (seconds_streamed - seconds_needed) >= 10: - await asyncio.sleep(8) + seconds_needed = int(time() - start_timestamp) + 15 + diff = seconds_streamed - seconds_needed + if diff: + await asyncio.sleep(diff) # end of the track reached # update actual duration to the queue for more accurate now playing info accurate_duration = bytes_written / sample_size diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 6da267c1..5e8a88fb 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -1,6 +1,7 @@ """Models and helpers for a player.""" from __future__ import annotations +import asyncio from abc import ABC from dataclasses import dataclass from enum import Enum, IntEnum @@ -50,7 +51,7 @@ class Player(ABC): _attr_group_childs: List[str] = [] _attr_name: str = "" _attr_powered: bool = False - _attr_elapsed_time: int = 0 + _attr_elapsed_time: float = 0 _attr_current_url: str = "" _attr_state: PlayerState = PlayerState.IDLE _attr_available: bool = True @@ -91,12 +92,12 @@ class Player(ABC): return self._attr_powered @property - def elapsed_time(self) -> int: + def elapsed_time(self) -> float: """Return elapsed time of current playing media in seconds.""" return self._attr_elapsed_time @property - def corrected_elapsed_time(self) -> int: + def corrected_elapsed_time(self) -> float: """Return corrected elapsed time of current playing media in seconds.""" return self._attr_elapsed_time + (time() - self._last_elapsed_time_received) @@ -150,7 +151,7 @@ class Player(ABC): at more or less the same time. The player's implementation will be responsible for synchronization of audio on child players (if possible), Music Assistant will only coordinate the start and makes sure that every child received the same audio chunk - within the same timespan. Multi stream is currently limited to 44100/16 only. + within the same timespan. """ return self._attr_use_multi_stream @@ -230,14 +231,11 @@ class Player(ABC): if "elapsed_time" in changed_keys: self._last_elapsed_time_received = time() - elif "state" in changed_keys and self.state == PlayerState.PLAYING: - self._attr_elapsed_time = 0 - self._last_elapsed_time_received = time() # always update the playerqueue self.mass.players.get_player_queue(self.player_id).on_player_update() - if len(changed_keys) == 0 or changed_keys == {"elapsed_time"}: + if len(changed_keys) == 0 and changed_keys != {"corrected_elapsed_time"}: return self._prev_state = cur_state @@ -290,7 +288,8 @@ class Player(ABC): "player_id": self.player_id, "name": self.name, "powered": self.powered, - "elapsed_time": self.elapsed_time, + "elapsed_time": int(self.elapsed_time), + "corrected_elapsed_time": int(self.corrected_elapsed_time), "state": self.state.value, "available": self.available, "is_group": self.is_group, @@ -307,7 +306,7 @@ class PlayerGroup(Player): is_group: bool = True _attr_group_childs: List[str] = [] - _attr_support_join_control: bool = True + _correct_progress = set() @property def volume_level(self) -> int: @@ -318,13 +317,87 @@ class PlayerGroup(Player): # may be overridden if implementation provides this natively group_volume = 0 active_players = 0 - for child_player in self._get_players(True): + for child_player in self._get_child_players(True): group_volume += child_player.volume_level active_players += 1 if active_players: group_volume = group_volume / active_players return int(group_volume) + @property + def corrected_elapsed_time(self) -> float: + """Return the corrected/precise elsapsed time of the grouped player.""" + if not self.use_multi_stream: + return super().corrected_elapsed_time + # calculate from group childs + for child_player in self._get_child_players(True): + if not child_player.current_url: + continue + if self.player_id not in child_player.current_url: + continue + # if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]: + # continue + return child_player.corrected_elapsed_time + return 0 + + @property + def state(self) -> PlayerState: + """Return the state of the grouped player.""" + if not self.use_multi_stream: + return super().state + # calculate from group childs + for child_player in self._get_child_players(True): + if not child_player.current_url: + continue + if self.player_id not in child_player.current_url: + continue + if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]: + continue + return child_player.state + return super().state + + @property + def current_url(self) -> str: + """Return the current_url of the grouped player.""" + if not self.use_multi_stream: + return super().current_url + # calculate from group childs + for child_player in self._get_child_players(True): + if not child_player.current_url: + continue + if self.player_id not in child_player.current_url: + continue + return child_player.current_url + return super().current_url + + async def stop(self) -> None: + """Send STOP command to player.""" + if not self.use_multi_stream: + return await super().stop() + # redirect command to all child players + await asyncio.gather(*[x.stop() for x in self._get_child_players(True)]) + + async def play(self) -> None: + """Send PLAY/UNPAUSE command to player.""" + if not self.use_multi_stream: + return await super().play() + # redirect command to all child players + await asyncio.gather(*[x.play() for x in self._get_child_players(True)]) + + async def pause(self) -> None: + """Send PAUSE command to player.""" + if not self.use_multi_stream: + return await super().pause() + # redirect command to all child players + await asyncio.gather(*[x.pause() for x in self._get_child_players(True)]) + + async def power(self, powered: bool) -> None: + """Send POWER command to player.""" + if not self.use_multi_stream: + return await super().power(powered) + # redirect command to all child players + await asyncio.gather(*[x.power(powered) for x in self._get_child_players(True)]) + async def volume_set(self, volume_level: int) -> None: """Send volume level (0..100) command to player.""" # handle group volume by only applying the valume to powered childs @@ -336,24 +409,32 @@ class PlayerGroup(Player): volume_dif_percent = 1 + (new_volume / 100) else: volume_dif_percent = volume_dif / cur_volume - for child_player in self._get_players(True): + for child_player in self._get_child_players(True): cur_child_volume = child_player.volume_level new_child_volume = cur_child_volume + ( cur_child_volume * volume_dif_percent ) await child_player.volume_set(new_child_volume) - def _get_players(self, only_powered: bool = False) -> List[Player]: + def _get_child_players( + self, only_powered: bool = False, only_playing: bool = False + ) -> List[Player]: """Get players attached to this group.""" - return [ - x - for x in self.mass.players - if x.player_id in self.group_childs and x.powered or not only_powered - ] + if not self.mass: + return [] + child_players = [] + for child_id in self.group_childs: + if child_player := self.mass.players.get_player(child_id): + if not (not only_powered or child_player.powered): + continue + if not (not only_playing or child_player.state == PlayerState.PLAYING): + continue + child_players.append(child_player) + return child_players def on_child_update(self, player_id: str, changed_keys: set) -> None: """Call when one of the child players of a playergroup updates.""" - super().on_child_update(player_id, changed_keys) + self.update_state(True) if "powered" in changed_keys: # convenience helper: # power off group player if last child player turns off diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index e516f069..55181363 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio import random -import time from asyncio import Task, TimerHandle from dataclasses import dataclass from enum import Enum @@ -17,7 +16,7 @@ from music_assistant.helpers.typing import MusicAssistant from music_assistant.models.errors import MediaNotFoundError, QueueEmpty from music_assistant.models.media_items import ContentType, MediaType, StreamDetails -from .player import Player, PlayerState +from .player import Player, PlayerGroup, PlayerState if TYPE_CHECKING: from music_assistant.models.media_items import Radio, Track @@ -89,7 +88,6 @@ class PlayerQueue: self._save_task: TimerHandle = None self._update_task: Task = None self._signal_next: bool = False - self._last_player_update: int = 0 self._stream_url: str = self.mass.players.streams.get_stream_url(self.queue_id) async def setup(self) -> None: @@ -98,20 +96,22 @@ class PlayerQueue: self.mass.signal_event(EventType.QUEUE_ADDED, self) @property - def player(self) -> Player: + def player(self) -> Player | PlayerGroup: """Return the player attached to this queue.""" return self.mass.players.get_player(self.queue_id, include_unavailable=True) @property def active(self) -> bool: """Return bool if the queue is currenty active on the player.""" + if self.player.use_multi_stream: + return self.queue_id in self.player.current_url return self._stream_url == self.player.current_url @property - def elapsed_time(self) -> int: + def elapsed_time(self) -> float: """Return elapsed time of current playing media in seconds.""" if not self.active: - return self.player.elapsed_time + return self.player.corrected_elapsed_time return self._current_item_time @property @@ -380,7 +380,7 @@ class PlayerQueue: if self.player.use_multi_stream: # multi stream enabled, all child players should receive the same audio stream # redirect command to all (powered) players - tasks = [] + coros = [] expected_clients = set() for child_id in self.player.group_childs: if child_player := self.mass.players.get_player(child_id): @@ -389,12 +389,11 @@ class PlayerQueue: self.queue_id, child_id ) expected_clients.add(child_id) - tasks.append(child_player.play_url(player_url)) - tasks.append(child_player.pause()) + coros.append(child_player.play_url(player_url)) await self.mass.players.streams.start_multi_client_queue_stream( self.queue_id, expected_clients, ContentType.FLAC ) - await asyncio.gather(*tasks) + await asyncio.gather(*coros) else: # regular (single player) request await self.player.play_url(self._stream_url) @@ -496,20 +495,18 @@ class PlayerQueue: def on_player_update(self) -> None: """Call when player updates.""" - self._last_player_update = time.time() if self._last_state != self.player.state: self._last_state = self.player.state # handle case where stream stopped on purpose and we need to restart it if self.player.state != PlayerState.PLAYING and self._signal_next: self._signal_next = False - self.mass.create_task(self.play()) + self.mass.create_task(self.resume()) # start updater task if needed if self.player.state == PlayerState.PLAYING: if not self._update_task: self._update_task = self.mass.create_task(self.__update_task()) - else: - if self._update_task: - self._update_task.cancel() + elif self._update_task: + self._update_task.cancel() self._update_task = None if not self.update_state(): @@ -518,11 +515,16 @@ class PlayerQueue: def update_state(self) -> bool: """Update queue details, called when player updates.""" + if self.player.active_queue.queue_id != self.queue_id: + return new_index = self._current_index track_time = self._current_item_time new_item_loaded = False - # if self.player.state == PlayerState.PLAYING and self.elapsed_time > 1: - if self.player.state == PlayerState.PLAYING: + # if self.player.state == PlayerState.PLAYING: + if ( + self.player.state == PlayerState.PLAYING + and self.player.corrected_elapsed_time > 0 + ): new_index, track_time = self.__get_queue_stream_index() # process new index if self._current_index != new_index: @@ -597,10 +599,10 @@ class PlayerQueue: self._signal_next = True async def __update_task(self) -> None: - """Update player queue every interval.""" + """Update player queue every second while playing.""" while True: - self.update_state() await asyncio.sleep(1) + self.update_state() def __get_queue_stream_index(self) -> Tuple[int, int]: """Calculate current queue index and current track elapsed time."""