use float for elapsed time for more accurate precision
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 8 Apr 2022 13:16:09 +0000 (15:16 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 8 Apr 2022 13:16:09 +0000 (15:16 +0200)
music_assistant/controllers/stream.py
music_assistant/models/player.py
music_assistant/models/player_queue.py

index c3aea3e2787396ca77df1b221d8fcbd3d10b7fba..a3bac42cde3f01407734089ed0b7cf931ad5a1bc 100644 (file)
@@ -19,7 +19,7 @@ from music_assistant.helpers.audio import (
 from music_assistant.helpers.process import AsyncProcess
 from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.helpers.util import get_ip
-from music_assistant.models.errors import MediaNotFoundError
+from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
 from music_assistant.models.media_items import ContentType
 from music_assistant.models.player_queue import PlayerQueue
 
@@ -161,18 +161,24 @@ class StreamController:
         await self.subscribe_client(queue_id, player_id)
         try:
             while True:
-                audio_chunk = await self._client_queues[queue_id][player_id].get()
-                await resp.write(audio_chunk)
-                self._client_queues[queue_id][player_id].task_done()
+                client_queue = self._client_queues.get(queue_id).get(player_id)
+                if not client_queue:
+                    break
+                audio_chunk = await client_queue.get()
                 if audio_chunk == b"":
-                    # last chunk
+                    # eof
                     break
+                await resp.write(audio_chunk)
+                client_queue.task_done()
         finally:
             await self.unsubscribe_client(queue_id, player_id)
         return resp
 
     async def subscribe_client(self, queue_id: str, player_id: str) -> None:
         """Subscribe client to queue stream."""
+        if queue_id not in self._stream_tasks:
+            raise MusicAssistantError(f"No Queue stream available for {queue_id}")
+
         if queue_id not in self._subscribers:
             self._subscribers[queue_id] = set()
         self._subscribers[queue_id].add(player_id)
@@ -188,7 +194,7 @@ class StreamController:
         if player_id in self._subscribers[queue_id]:
             self._subscribers[queue_id].remove(player_id)
 
-        self._client_queues.get(queue_id, {}).pop(player_id, None)
+        self.__cleanup_client_queue(queue_id, player_id)
         self.logger.debug(
             "Unsubscribed player %s from multi queue stream %s", player_id, queue_id
         )
@@ -217,25 +223,6 @@ class StreamController:
             self.__multi_client_queue_stream_runner(queue_id, output_fmt)
         )
 
-        async def unpause_synced_start():
-            # unpause if we got all clients (or timeout) for (more or less) synced start
-            expected_clients = len(self._client_queues[queue_id])
-            time_started = self._time_started[queue_id]
-            while True:
-                await asyncio.sleep(0.1)
-                if (len(self._subscribers) >= expected_clients) or (
-                    time() - time_started
-                ) > 5:
-                    await asyncio.gather(
-                        *[
-                            self.mass.players.get_player(client_id).play()
-                            for client_id in self._subscribers.get(queue_id, {})
-                        ]
-                    )
-                    break
-
-        self.mass.create_task(unpause_synced_start)
-
     async def stop_multi_client_queue_stream(self, queue_id: str) -> None:
         """Signal a running queue stream task and its listeners to stop."""
         if queue_id not in self._stream_tasks:
@@ -265,14 +252,6 @@ class StreamController:
         """Distribute audio chunks over connected clients in a multi client queue stream."""
         queue = self.mass.players.get_player_queue(queue_id)
 
-        def cleanup_client_queue(player_id: str):
-            if client_queue := self._client_queues[queue_id].get(player_id, None):
-                self.logger.debug("cleaning up child queue %s", player_id)
-                for _ in range(client_queue.qsize()):
-                    client_queue.get_nowait()
-                    client_queue.task_done()
-                client_queue.put_nowait(b"")
-
         start_streamdetails = await queue.queue_stream_prepare()
         sox_args = await get_sox_args_for_pcm_stream(
             start_streamdetails.sample_rate,
@@ -288,7 +267,7 @@ class StreamController:
             async with AsyncProcess(sox_args, True) as sox_proc:
 
                 async def writer():
-                    # task that sends the raw pcm audio to the sox/ffmpeg process
+                    """Task that sends the raw pcm audio to the sox/ffmpeg process."""
                     async for audio_chunk in self._get_queue_stream(
                         queue,
                         sample_rate=start_streamdetails.sample_rate,
@@ -302,19 +281,19 @@ class StreamController:
                     sox_proc.write_eof()
 
                 async def reader():
-                    # read bytes from final output
+                    """Read bytes from final output and put chunk on child queues."""
                     chunks_sent = 0
-                    chunksize = 32000 if output_fmt == ContentType.MP3 else 90000
-                    async for chunk in sox_proc.iterate_chunks(chunksize):
+                    async for chunk in sox_proc.iterate_chunks(256000):
                         chunks_sent += 1
                         coros = []
                         for player_id in list(self._client_queues[queue_id].keys()):
                             if (
-                                chunks_sent >= 20
+                                self._client_queues[queue_id][player_id].full()
+                                and chunks_sent >= 10
                                 and player_id not in self._subscribers[queue_id]
                             ):
-                                # assume client did not connect or got disconnected somehow
-                                cleanup_client_queue(player_id)
+                                # assume client did not connect at all or got disconnected somehow
+                                self.__cleanup_client_queue(queue_id, player_id)
                                 self._client_queues[queue_id].pop(player_id, None)
                             else:
                                 coros.append(
@@ -322,20 +301,34 @@ class StreamController:
                                 )
                         await asyncio.gather(*coros)
 
+                # launch the reader and writer
                 await asyncio.gather(*[writer(), reader()])
-            # send empty chunk to inform EOF
-            await asyncio.gather(
-                *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
-            )
+                # wait for all queues to consume their data
+                await asyncio.gather(
+                    *[cq.join() for cq in self._client_queues[queue_id].values()]
+                )
+                # send empty chunk to inform EOF
+                await asyncio.gather(
+                    *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
+                )
 
         finally:
+            self.logger.debug("Multi client queue stream %s finished", queue.queue_id)
             # cleanup
             self._stream_tasks.pop(queue_id, None)
             for player_id in list(self._client_queues[queue_id].keys()):
-                cleanup_client_queue(player_id)
+                self.__cleanup_client_queue(queue_id, player_id)
 
             self.logger.debug("Multi client queue stream %s ended", queue.queue_id)
 
+    def __cleanup_client_queue(self, queue_id: str, player_id: str):
+        """Cleanup a client queue after it completes/disconnects."""
+        if client_queue := self._client_queues.get(queue_id, {}).pop(player_id, None):
+            for _ in range(client_queue.qsize()):
+                client_queue.get_nowait()
+                client_queue.task_done()
+            client_queue.put_nowait(b"")
+
     async def _get_queue_stream(
         self,
         queue: PlayerQueue,
@@ -348,7 +341,6 @@ class StreamController:
         last_fadeout_data = b""
         queue_index = None
         track_count = 0
-        start_timestamp = time()
 
         pcm_fmt = ContentType.from_bit_depth(bit_depth)
         self.logger.info(
@@ -360,6 +352,7 @@ class StreamController:
 
         # stream queue tracks one by one
         while True:
+            start_timestamp = time()
             # get the (next) track in queue
             track_count += 1
             if track_count == 1:
@@ -519,11 +512,12 @@ class StreamController:
                     else:
                         prev_chunk = chunk
                     del chunk
-                # guard for clients buffering too much
+                # allow clients to only buffer max ~15 seconds ahead
                 seconds_streamed = bytes_written / sample_size
-                seconds_needed = int(time() - start_timestamp)
-                if (seconds_streamed - seconds_needed) >= 10:
-                    await asyncio.sleep(8)
+                seconds_needed = int(time() - start_timestamp) + 15
+                diff = seconds_streamed - seconds_needed
+                if diff:
+                    await asyncio.sleep(diff)
             # end of the track reached
             # update actual duration to the queue for more accurate now playing info
             accurate_duration = bytes_written / sample_size
index 6da267c16de94cf0ae296e18bab87fe4fd35a40f..5e8a88fb24feb1c5fed89144e5b8db6657bf5bfe 100755 (executable)
@@ -1,6 +1,7 @@
 """Models and helpers for a player."""
 from __future__ import annotations
 
+import asyncio
 from abc import ABC
 from dataclasses import dataclass
 from enum import Enum, IntEnum
@@ -50,7 +51,7 @@ class Player(ABC):
     _attr_group_childs: List[str] = []
     _attr_name: str = ""
     _attr_powered: bool = False
-    _attr_elapsed_time: int = 0
+    _attr_elapsed_time: float = 0
     _attr_current_url: str = ""
     _attr_state: PlayerState = PlayerState.IDLE
     _attr_available: bool = True
@@ -91,12 +92,12 @@ class Player(ABC):
         return self._attr_powered
 
     @property
-    def elapsed_time(self) -> int:
+    def elapsed_time(self) -> float:
         """Return elapsed time of current playing media in seconds."""
         return self._attr_elapsed_time
 
     @property
-    def corrected_elapsed_time(self) -> int:
+    def corrected_elapsed_time(self) -> float:
         """Return corrected elapsed time of current playing media in seconds."""
         return self._attr_elapsed_time + (time() - self._last_elapsed_time_received)
 
@@ -150,7 +151,7 @@ class Player(ABC):
         at more or less the same time. The player's implementation will be responsible for
         synchronization of audio on child players (if possible), Music Assistant will only
         coordinate the start and makes sure that every child received the same audio chunk
-        within the same timespan. Multi stream is currently limited to 44100/16 only.
+        within the same timespan.
         """
         return self._attr_use_multi_stream
 
@@ -230,14 +231,11 @@ class Player(ABC):
 
         if "elapsed_time" in changed_keys:
             self._last_elapsed_time_received = time()
-        elif "state" in changed_keys and self.state == PlayerState.PLAYING:
-            self._attr_elapsed_time = 0
-            self._last_elapsed_time_received = time()
 
         # always update the playerqueue
         self.mass.players.get_player_queue(self.player_id).on_player_update()
 
-        if len(changed_keys) == 0 or changed_keys == {"elapsed_time"}:
+        if len(changed_keys) == 0 and changed_keys != {"corrected_elapsed_time"}:
             return
 
         self._prev_state = cur_state
@@ -290,7 +288,8 @@ class Player(ABC):
             "player_id": self.player_id,
             "name": self.name,
             "powered": self.powered,
-            "elapsed_time": self.elapsed_time,
+            "elapsed_time": int(self.elapsed_time),
+            "corrected_elapsed_time": int(self.corrected_elapsed_time),
             "state": self.state.value,
             "available": self.available,
             "is_group": self.is_group,
@@ -307,7 +306,7 @@ class PlayerGroup(Player):
 
     is_group: bool = True
     _attr_group_childs: List[str] = []
-    _attr_support_join_control: bool = True
+    _correct_progress = set()
 
     @property
     def volume_level(self) -> int:
@@ -318,13 +317,87 @@ class PlayerGroup(Player):
         # may be overridden if implementation provides this natively
         group_volume = 0
         active_players = 0
-        for child_player in self._get_players(True):
+        for child_player in self._get_child_players(True):
             group_volume += child_player.volume_level
             active_players += 1
         if active_players:
             group_volume = group_volume / active_players
         return int(group_volume)
 
+    @property
+    def corrected_elapsed_time(self) -> float:
+        """Return the corrected/precise elsapsed time of the grouped player."""
+        if not self.use_multi_stream:
+            return super().corrected_elapsed_time
+        # calculate from group childs
+        for child_player in self._get_child_players(True):
+            if not child_player.current_url:
+                continue
+            if self.player_id not in child_player.current_url:
+                continue
+            # if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
+            #     continue
+            return child_player.corrected_elapsed_time
+        return 0
+
+    @property
+    def state(self) -> PlayerState:
+        """Return the state of the grouped player."""
+        if not self.use_multi_stream:
+            return super().state
+        # calculate from group childs
+        for child_player in self._get_child_players(True):
+            if not child_player.current_url:
+                continue
+            if self.player_id not in child_player.current_url:
+                continue
+            if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
+                continue
+            return child_player.state
+        return super().state
+
+    @property
+    def current_url(self) -> str:
+        """Return the current_url of the grouped player."""
+        if not self.use_multi_stream:
+            return super().current_url
+        # calculate from group childs
+        for child_player in self._get_child_players(True):
+            if not child_player.current_url:
+                continue
+            if self.player_id not in child_player.current_url:
+                continue
+            return child_player.current_url
+        return super().current_url
+
+    async def stop(self) -> None:
+        """Send STOP command to player."""
+        if not self.use_multi_stream:
+            return await super().stop()
+        # redirect command to all child players
+        await asyncio.gather(*[x.stop() for x in self._get_child_players(True)])
+
+    async def play(self) -> None:
+        """Send PLAY/UNPAUSE command to player."""
+        if not self.use_multi_stream:
+            return await super().play()
+        # redirect command to all child players
+        await asyncio.gather(*[x.play() for x in self._get_child_players(True)])
+
+    async def pause(self) -> None:
+        """Send PAUSE command to player."""
+        if not self.use_multi_stream:
+            return await super().pause()
+        # redirect command to all child players
+        await asyncio.gather(*[x.pause() for x in self._get_child_players(True)])
+
+    async def power(self, powered: bool) -> None:
+        """Send POWER command to player."""
+        if not self.use_multi_stream:
+            return await super().power(powered)
+        # redirect command to all child players
+        await asyncio.gather(*[x.power(powered) for x in self._get_child_players(True)])
+
     async def volume_set(self, volume_level: int) -> None:
         """Send volume level (0..100) command to player."""
         # handle group volume by only applying the valume to powered childs
@@ -336,24 +409,32 @@ class PlayerGroup(Player):
             volume_dif_percent = 1 + (new_volume / 100)
         else:
             volume_dif_percent = volume_dif / cur_volume
-        for child_player in self._get_players(True):
+        for child_player in self._get_child_players(True):
             cur_child_volume = child_player.volume_level
             new_child_volume = cur_child_volume + (
                 cur_child_volume * volume_dif_percent
             )
             await child_player.volume_set(new_child_volume)
 
-    def _get_players(self, only_powered: bool = False) -> List[Player]:
+    def _get_child_players(
+        self, only_powered: bool = False, only_playing: bool = False
+    ) -> List[Player]:
         """Get players attached to this group."""
-        return [
-            x
-            for x in self.mass.players
-            if x.player_id in self.group_childs and x.powered or not only_powered
-        ]
+        if not self.mass:
+            return []
+        child_players = []
+        for child_id in self.group_childs:
+            if child_player := self.mass.players.get_player(child_id):
+                if not (not only_powered or child_player.powered):
+                    continue
+                if not (not only_playing or child_player.state == PlayerState.PLAYING):
+                    continue
+                child_players.append(child_player)
+        return child_players
 
     def on_child_update(self, player_id: str, changed_keys: set) -> None:
         """Call when one of the child players of a playergroup updates."""
-        super().on_child_update(player_id, changed_keys)
+        self.update_state(True)
         if "powered" in changed_keys:
             # convenience helper:
             # power off group player if last child player turns off
index e516f0698caaf0b94e54fee6a3b2b0e071d6d24d..55181363301cef4bd41045bd48c64f2d82d49606 100644 (file)
@@ -3,7 +3,6 @@ from __future__ import annotations
 
 import asyncio
 import random
-import time
 from asyncio import Task, TimerHandle
 from dataclasses import dataclass
 from enum import Enum
@@ -17,7 +16,7 @@ from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
 from music_assistant.models.media_items import ContentType, MediaType, StreamDetails
 
-from .player import Player, PlayerState
+from .player import Player, PlayerGroup, PlayerState
 
 if TYPE_CHECKING:
     from music_assistant.models.media_items import Radio, Track
@@ -89,7 +88,6 @@ class PlayerQueue:
         self._save_task: TimerHandle = None
         self._update_task: Task = None
         self._signal_next: bool = False
-        self._last_player_update: int = 0
         self._stream_url: str = self.mass.players.streams.get_stream_url(self.queue_id)
 
     async def setup(self) -> None:
@@ -98,20 +96,22 @@ class PlayerQueue:
         self.mass.signal_event(EventType.QUEUE_ADDED, self)
 
     @property
-    def player(self) -> Player:
+    def player(self) -> Player | PlayerGroup:
         """Return the player attached to this queue."""
         return self.mass.players.get_player(self.queue_id, include_unavailable=True)
 
     @property
     def active(self) -> bool:
         """Return bool if the queue is currenty active on the player."""
+        if self.player.use_multi_stream:
+            return self.queue_id in self.player.current_url
         return self._stream_url == self.player.current_url
 
     @property
-    def elapsed_time(self) -> int:
+    def elapsed_time(self) -> float:
         """Return elapsed time of current playing media in seconds."""
         if not self.active:
-            return self.player.elapsed_time
+            return self.player.corrected_elapsed_time
         return self._current_item_time
 
     @property
@@ -380,7 +380,7 @@ class PlayerQueue:
         if self.player.use_multi_stream:
             # multi stream enabled, all child players should receive the same audio stream
             # redirect command to all (powered) players
-            tasks = []
+            coros = []
             expected_clients = set()
             for child_id in self.player.group_childs:
                 if child_player := self.mass.players.get_player(child_id):
@@ -389,12 +389,11 @@ class PlayerQueue:
                             self.queue_id, child_id
                         )
                         expected_clients.add(child_id)
-                        tasks.append(child_player.play_url(player_url))
-                        tasks.append(child_player.pause())
+                        coros.append(child_player.play_url(player_url))
             await self.mass.players.streams.start_multi_client_queue_stream(
                 self.queue_id, expected_clients, ContentType.FLAC
             )
-            await asyncio.gather(*tasks)
+            await asyncio.gather(*coros)
         else:
             # regular (single player) request
             await self.player.play_url(self._stream_url)
@@ -496,20 +495,18 @@ class PlayerQueue:
 
     def on_player_update(self) -> None:
         """Call when player updates."""
-        self._last_player_update = time.time()
         if self._last_state != self.player.state:
             self._last_state = self.player.state
             # handle case where stream stopped on purpose and we need to restart it
             if self.player.state != PlayerState.PLAYING and self._signal_next:
                 self._signal_next = False
-                self.mass.create_task(self.play())
+                self.mass.create_task(self.resume())
             # start updater task if needed
             if self.player.state == PlayerState.PLAYING:
                 if not self._update_task:
                     self._update_task = self.mass.create_task(self.__update_task())
-            else:
-                if self._update_task:
-                    self._update_task.cancel()
+            elif self._update_task:
+                self._update_task.cancel()
                 self._update_task = None
 
         if not self.update_state():
@@ -518,11 +515,16 @@ class PlayerQueue:
 
     def update_state(self) -> bool:
         """Update queue details, called when player updates."""
+        if self.player.active_queue.queue_id != self.queue_id:
+            return
         new_index = self._current_index
         track_time = self._current_item_time
         new_item_loaded = False
-        # if self.player.state == PlayerState.PLAYING and self.elapsed_time > 1:
-        if self.player.state == PlayerState.PLAYING:
+        # if self.player.state == PlayerState.PLAYING:
+        if (
+            self.player.state == PlayerState.PLAYING
+            and self.player.corrected_elapsed_time > 0
+        ):
             new_index, track_time = self.__get_queue_stream_index()
         # process new index
         if self._current_index != new_index:
@@ -597,10 +599,10 @@ class PlayerQueue:
         self._signal_next = True
 
     async def __update_task(self) -> None:
-        """Update player queue every interval."""
+        """Update player queue every second while playing."""
         while True:
-            self.update_state()
             await asyncio.sleep(1)
+            self.update_state()
 
     def __get_queue_stream_index(self) -> Tuple[int, int]:
         """Calculate current queue index and current track elapsed time."""