improve multi client streaming
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 7 Apr 2022 23:44:14 +0000 (01:44 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 7 Apr 2022 23:44:14 +0000 (01:44 +0200)
music_assistant/controllers/stream.py
music_assistant/models/player.py
music_assistant/models/player_queue.py

index 741acc81aba3980a2a3b74d2e0c19e2614404d71..ea6823b46a1aa9975ee26dfcbcd1c1e59c7426ce 100644 (file)
@@ -4,8 +4,7 @@ from __future__ import annotations
 import asyncio
 from asyncio import Task
 from time import time
-from typing import AsyncGenerator, Awaitable, Callable, Dict, List
-from uuid import uuid4
+from typing import AsyncGenerator, Dict, Optional, Set
 
 from aiohttp import web
 from music_assistant.constants import EventType
@@ -35,28 +34,30 @@ class StreamController:
         self.logger = mass.logger.getChild("stream")
         self._port = port
         self._ip: str = get_ip()
-        self._subscribers: Dict[str, Dict[str, List[Callable]]] = {}
+        self._subscribers: Dict[str, Set[str]] = {}
+        self._client_queues: Dict[str, Dict[str, asyncio.Queue]] = {}
         self._stream_tasks: Dict[str, Task] = {}
+        self._time_started: Dict[str, float] = {}
 
-    def get_stream_url(self, queue_id: str) -> str:
+    def get_stream_url(
+        self, queue_id: str, child_player: Optional[str] = None, fmt: str = "flac"
+    ) -> str:
         """Return the full stream url for the PlayerQueue Stream."""
-        checksum = str(int(time()))
-        queue = self.mass.players.get_player_queue(queue_id)
-        if queue.player.use_multi_stream:
-            return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav?checksum={checksum}"
-        return f"http://{self._ip}:{self._port}/{queue_id}.flac?checksum={checksum}"
+        if child_player:
+            fmt = "wav"
+            return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{fmt}"
+        return f"http://{self._ip}:{self._port}/{queue_id}.{fmt}"
 
     async def setup(self) -> None:
         """Async initialize of module."""
         app = web.Application()
 
         app.router.add_get(
-            "/multi/{queue_id}.wav", self.serve_multi_client_queue_stream
-        )
-        app.router.add_get(
-            "/{queue_id}.{format}", self.serve_queue_stream, allow_head=True
+            "/{queue_id}/{player_id}.{format}",
+            self.serve_multi_client_queue_stream,
         )
-        app.router.add_get("/{queue_id}", self.serve_queue_stream, allow_head=True)
+        app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream)
+        app.router.add_get("/{queue_id}", self.serve_queue_stream)
 
         runner = web.AppRunner(app, access_log=None)
         await runner.setup()
@@ -103,8 +104,6 @@ class StreamController:
             status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"}
         )
         await resp.prepare(request)
-        if request.method == "HEAD":
-            return
 
         start_streamdetails = await queue.queue_stream_prepare()
         output_fmt = ContentType(fmt)
@@ -144,10 +143,11 @@ class StreamController:
     async def serve_multi_client_queue_stream(self, request: web.Request):
         """Serve queue audio stream to multiple (group)clients in the raw PCM format."""
         queue_id = request.match_info["queue_id"]
+        player_id = request.match_info["player_id"]
         queue = self.mass.players.get_player_queue(queue_id)
-        client_id = str(uuid4())
+        player = self.mass.players.get_player(player_id)
 
-        if queue is None:
+        if queue is None or player is None:
             return web.Response(status=404)
 
         # prepare request
@@ -159,8 +159,6 @@ class StreamController:
             },
         )
         await resp.prepare(request)
-        if request.method == "HEAD":
-            return resp
 
         # write wave header
         # multi subscriber queue is (currently) limited to 44100/16 format
@@ -168,60 +166,39 @@ class StreamController:
         await resp.write(wav_header)
 
         # start delivering audio chunks
-        last_chunk_received = asyncio.Event()
+        await self.subscribe_client(queue_id, player_id)
         try:
-
-            async def audio_callback(audio_chunk):
+            while True:
+                audio_chunk = await self._client_queues[queue_id][player_id].get()
+                await resp.write(audio_chunk)
+                self._client_queues[queue_id][player_id].task_done()
                 if audio_chunk == b"":
-                    last_chunk_received.set()
-                    return
-                try:
-                    await resp.write(audio_chunk)
-                except BrokenPipeError:
-                    pass  # race condition
-
-            await self.subscribe_client(queue_id, client_id, audio_callback)
-            await last_chunk_received.wait()
+                    # last chunk
+                    break
         finally:
-            await self.unsubscribe_client(queue_id, client_id)
+            await self.unsubscribe_client(queue_id, player_id)
         return resp
 
-    async def subscribe_client(
-        self, queue_id: str, client_id: str, callback: Awaitable
-    ) -> None:
+    async def subscribe_client(self, queue_id: str, player_id: str) -> None:
         """Subscribe client to queue stream."""
         if queue_id not in self._subscribers:
-            self._subscribers[queue_id] = {}
-        self._subscribers[queue_id][client_id] = callback
-
-        queue = self.mass.players.get_player_queue(queue_id)
-        # calculate number of expected clients
-        expected_clients = 0
-        for child_id in queue.player.group_childs:
-            if player := self.mass.players.get_player(child_id):
-                if player.powered:
-                    expected_clients += 1
-        assert expected_clients, "No clients expected for this stream"
-
-        stream_task = self._stream_tasks.get(queue_id)
-        # we start the stream as soon as we've reached the expected number of clients
-        # TODO: add timeout guard just in case we don't reach the number of expected client
-        if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients:
-            # start the stream task
-            await asyncio.sleep(1)  # relax
-            await self.start_multi_queue_stream(queue_id)
+            self._subscribers[queue_id] = set()
+        self._subscribers[queue_id].add(player_id)
 
         self.logger.debug(
-            "Subscribed client %s to multi queue stream %s",
-            client_id,
-            queue.queue_id,
+            "Subscribed player %s to multi queue stream %s",
+            player_id,
+            queue_id,
         )
 
-    async def unsubscribe_client(self, queue_id: str, clientid: str):
+    async def unsubscribe_client(self, queue_id: str, player_id: str):
         """Unsubscribe client from queue stream."""
-        self._subscribers[queue_id].pop(clientid, None)
+        if player_id in self._subscribers[queue_id]:
+            self._subscribers[queue_id].remove(player_id)
+
+        self._client_queues.get(queue_id, {}).pop(player_id, None)
         self.logger.debug(
-            "Unsubscribed client %s from multi queue stream %s", clientid, queue_id
+            "Unsubscribed player %s from multi queue stream %s", player_id, queue_id
         )
         if len(self._subscribers[queue_id]) == 0:
             # no more clients, cancel stream task
@@ -231,44 +208,111 @@ class StreamController:
                 )
                 task.cancel()
 
-    async def start_multi_queue_stream(self, queue_id: str) -> None:
+    async def start_multi_client_queue_stream(
+        self, queue_id: str, expected_clients: Set[str]
+    ) -> None:
         """Start the Queue stream feeding callbacks of listeners.."""
-        queue = self.mass.players.get_player_queue(queue_id)
         assert queue_id not in self._stream_tasks, "already running!"
 
-        async def queue_task():
-            self.logger.debug("Multi client queue stream %s started", queue.queue_id)
-            try:
-                async for chunk in self._get_queue_stream(
-                    queue, 44100, 16, 2, resample=True
-                ):
-                    if len(self._subscribers[queue_id].values()) == 0:
-                        # just in case of race conditions
-                        return
+        self._time_started[queue_id] = time()
+
+        # create queue for expected clients
+        self._client_queues.setdefault(queue_id, {})
+        for child_id in expected_clients:
+            self._client_queues[queue_id][child_id] = asyncio.Queue(5)
+
+        self._stream_tasks[queue_id] = asyncio.create_task(
+            self.__multi_client_queue_stream_runner(queue_id)
+        )
+
+        async def unpause_synced_start():
+            # unpause if we got all clients (or timeout)
+            expected_clients = len(self._client_queues[queue_id])
+            time_started = self._time_started[queue_id]
+            while True:
+                await asyncio.sleep(0.1)
+                if (len(self._subscribers) >= expected_clients) or (
+                    time() - time_started
+                ) > 5:
                     await asyncio.gather(
                         *[
-                            cb(chunk)
-                            for cb in list(self._subscribers[queue_id].values())
+                            self.mass.players.get_player(client_id).play()
+                            for client_id in self._subscribers.get(queue_id, {})
                         ]
                     )
-            finally:
-                self._stream_tasks.pop(queue_id, None)
-                # send empty chunk to inform EOF
-                await asyncio.gather(
-                    *[cb(b"") for cb in list(self._subscribers[queue_id].values())]
-                )
-                self.logger.debug("Multi client queue stream %s ended", queue.queue_id)
+                    break
 
-        self._stream_tasks[queue_id] = asyncio.create_task(queue_task())
+        self.mass.create_task(unpause_synced_start)
 
-    async def stop_multi_queue_stream(self, queue_id: str) -> None:
+    async def stop_multi_client_queue_stream(self, queue_id: str) -> None:
         """Signal a running queue stream task and its listeners to stop."""
         if queue_id not in self._stream_tasks:
             return
 
+        # send stop to child players
+        await asyncio.gather(
+            *[
+                self.mass.players.get_player(client_id).stop()
+                for client_id in self._subscribers.get(queue_id, {})
+            ]
+        )
+
+        # stop background task
         if stream_task := self._stream_tasks.pop(queue_id, None):
             stream_task.cancel()
-            await stream_task
+
+        # wait for cleanup
+        while len(self._subscribers.get(queue_id, {})) != 0:
+            await asyncio.sleep(0.1)
+        while len(self._client_queues.get(queue_id, {})) != 0:
+            await asyncio.sleep(0.1)
+
+    async def __multi_client_queue_stream_runner(self, queue_id: str):
+        """Distribute audio chunks over connected clients in a multi client queue stream."""
+        queue = self.mass.players.get_player_queue(queue_id)
+        chunks_sent = 0
+
+        def cleanup_client_queue(player_id: str):
+            if client_queue := self._client_queues[queue_id].get(player_id, None):
+                self.logger.debug("cleaning up child queue %s", player_id)
+                for _ in range(client_queue.qsize()):
+                    client_queue.get_nowait()
+                    client_queue.task_done()
+                client_queue.put_nowait(b"")
+
+        self.logger.debug("Multi client queue stream %s started", queue.queue_id)
+        try:
+            async for chunk in self._get_queue_stream(
+                queue, 44100, 16, 2, resample=True
+            ):
+                chunks_sent += 1
+                coros = []
+                disconnected_clients = set()
+                for player_id, client_queue in self._client_queues[queue_id].items():
+                    if (
+                        chunks_sent >= 4
+                        and player_id not in self._subscribers[queue_id]
+                    ):
+                        # assume client did not connect or got disconnected somehow
+                        disconnected_clients.add(player_id)
+                    else:
+                        coros.append(client_queue.put(chunk))
+                await asyncio.gather(*coros)
+                for player_id in disconnected_clients:
+                    cleanup_client_queue(player_id)
+
+            # send empty chunk to inform EOF
+            await asyncio.gather(
+                *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
+            )
+
+        finally:
+            # cleanup
+            self._stream_tasks.pop(queue_id, None)
+            for player_id in list(self._client_queues[queue_id].keys()):
+                cleanup_client_queue(player_id)
+
+            self.logger.debug("Multi client queue stream %s ended", queue.queue_id)
 
     async def _get_queue_stream(
         self,
@@ -455,10 +499,9 @@ class StreamController:
                     del chunk
                 # guard for clients buffering too much
                 seconds_streamed = bytes_written / sample_size
-                seconds_per_chunk = buffer_size / sample_size
-                seconds_needed = int(time() - start_timestamp + seconds_per_chunk)
-                if (seconds_streamed) > seconds_needed:
-                    await asyncio.sleep(seconds_per_chunk / 2)
+                seconds_needed = int(time() - start_timestamp)
+                if (seconds_streamed - seconds_needed) >= 10:
+                    await asyncio.sleep(8)
             # end of the track reached
             # update actual duration to the queue for more accurate now playing info
             accurate_duration = bytes_written / sample_size
index 3402f2100c3249f4e8650106aa07096dfdb8e2cb..6da267c16de94cf0ae296e18bab87fe4fd35a40f 100755 (executable)
@@ -274,11 +274,14 @@ class Player(ABC):
             player = self.mass.players.get_player(player_id)
             if not player or not player.powered:
                 continue
+            # if player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
+            #     continue
             queue = self.mass.players.get_player_queue(player_id)
             if not queue or not queue.active:
                 continue
-            # match found!
-            return queue.queue_id
+            if queue.queue_id in player.current_url:
+                # match found!
+                return queue.queue_id
         return self.player_id
 
     def to_dict(self) -> Dict[str, Any]:
index 0e15bfba336a5d401b7a47f7d42017b3c2b4b8ca..63cc47dfd234abb201fb572798c7a7f7a376510e 100644 (file)
@@ -90,7 +90,7 @@ class PlayerQueue:
         self._update_task: Task = None
         self._signal_next: bool = False
         self._last_player_update: int = 0
-        self._stream_url: Optional[str] = None
+        self._stream_url: str = self.mass.players.streams.get_stream_url(self.queue_id)
 
     async def setup(self) -> None:
         """Handle async setup of instance."""
@@ -362,6 +362,10 @@ class PlayerQueue:
 
     async def play_index(self, index: Union[int, str]) -> None:
         """Play item at index (or item_id) X in queue."""
+        if self.player.use_multi_stream:
+            await self.mass.players.streams.stop_multi_client_queue_stream(
+                self.queue_id
+            )
         if not isinstance(index, int):
             index = self.index_by_id(index)
         if index is None:
@@ -370,13 +374,30 @@ class PlayerQueue:
             return
         self._current_index = index
         self._next_start_index = index
-
         # send stream url to player connected to this queue
-        if self.player.use_multi_stream and self.player.state == PlayerState.PLAYING:
-            await self.player.stop()
-            await asyncio.sleep(1)
         self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id)
-        await self.player.play_url(self._stream_url)
+
+        if self.player.use_multi_stream:
+            # multi stream enabled, all child players should receive the same audio stream
+            # redirect command to all (powered) players
+            tasks = []
+            expected_clients = set()
+            for child_id in self.player.group_childs:
+                if child_player := self.mass.players.get_player(child_id):
+                    if child_player.powered:
+                        player_url = self.mass.players.streams.get_stream_url(
+                            self.queue_id, child_id
+                        )
+                        expected_clients.add(child_id)
+                        tasks.append(child_player.play_url(player_url))
+                        tasks.append(child_player.pause())
+            await self.mass.players.streams.start_multi_client_queue_stream(
+                self.queue_id, expected_clients
+            )
+            await asyncio.gather(*tasks)
+        else:
+            # regular (single player) request
+            await self.player.play_url(self._stream_url)
 
     async def move_item(self, queue_item_id: str, pos_shift: int = 1) -> None:
         """
@@ -532,7 +553,7 @@ class PlayerQueue:
         start_from_index = self._next_start_index
         try:
             next_item = self._items[start_from_index]
-        except IndexError as err:
+        except (IndexError, TypeError) as err:
             raise QueueEmpty() from err
         try:
             return await get_stream_details(self.mass, next_item, self.queue_id)