From 55a86232dad2e01516b62e31abeb1bad32372810 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 8 Apr 2022 01:44:14 +0200 Subject: [PATCH] improve multi client streaming --- music_assistant/controllers/stream.py | 219 +++++++++++++++---------- music_assistant/models/player.py | 7 +- music_assistant/models/player_queue.py | 35 +++- 3 files changed, 164 insertions(+), 97 deletions(-) diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index 741acc81..ea6823b4 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -4,8 +4,7 @@ from __future__ import annotations import asyncio from asyncio import Task from time import time -from typing import AsyncGenerator, Awaitable, Callable, Dict, List -from uuid import uuid4 +from typing import AsyncGenerator, Dict, Optional, Set from aiohttp import web from music_assistant.constants import EventType @@ -35,28 +34,30 @@ class StreamController: self.logger = mass.logger.getChild("stream") self._port = port self._ip: str = get_ip() - self._subscribers: Dict[str, Dict[str, List[Callable]]] = {} + self._subscribers: Dict[str, Set[str]] = {} + self._client_queues: Dict[str, Dict[str, asyncio.Queue]] = {} self._stream_tasks: Dict[str, Task] = {} + self._time_started: Dict[str, float] = {} - def get_stream_url(self, queue_id: str) -> str: + def get_stream_url( + self, queue_id: str, child_player: Optional[str] = None, fmt: str = "flac" + ) -> str: """Return the full stream url for the PlayerQueue Stream.""" - checksum = str(int(time())) - queue = self.mass.players.get_player_queue(queue_id) - if queue.player.use_multi_stream: - return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav?checksum={checksum}" - return f"http://{self._ip}:{self._port}/{queue_id}.flac?checksum={checksum}" + if child_player: + fmt = "wav" + return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{fmt}" + return f"http://{self._ip}:{self._port}/{queue_id}.{fmt}" async def setup(self) -> None: """Async initialize of module.""" app = web.Application() app.router.add_get( - "/multi/{queue_id}.wav", self.serve_multi_client_queue_stream - ) - app.router.add_get( - "/{queue_id}.{format}", self.serve_queue_stream, allow_head=True + "/{queue_id}/{player_id}.{format}", + self.serve_multi_client_queue_stream, ) - app.router.add_get("/{queue_id}", self.serve_queue_stream, allow_head=True) + app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream) + app.router.add_get("/{queue_id}", self.serve_queue_stream) runner = web.AppRunner(app, access_log=None) await runner.setup() @@ -103,8 +104,6 @@ class StreamController: status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"} ) await resp.prepare(request) - if request.method == "HEAD": - return start_streamdetails = await queue.queue_stream_prepare() output_fmt = ContentType(fmt) @@ -144,10 +143,11 @@ class StreamController: async def serve_multi_client_queue_stream(self, request: web.Request): """Serve queue audio stream to multiple (group)clients in the raw PCM format.""" queue_id = request.match_info["queue_id"] + player_id = request.match_info["player_id"] queue = self.mass.players.get_player_queue(queue_id) - client_id = str(uuid4()) + player = self.mass.players.get_player(player_id) - if queue is None: + if queue is None or player is None: return web.Response(status=404) # prepare request @@ -159,8 +159,6 @@ class StreamController: }, ) await resp.prepare(request) - if request.method == "HEAD": - return resp # write wave header # multi subscriber queue is (currently) limited to 44100/16 format @@ -168,60 +166,39 @@ class StreamController: await resp.write(wav_header) # start delivering audio chunks - last_chunk_received = asyncio.Event() + await self.subscribe_client(queue_id, player_id) try: - - async def audio_callback(audio_chunk): + while True: + audio_chunk = await self._client_queues[queue_id][player_id].get() + await resp.write(audio_chunk) + self._client_queues[queue_id][player_id].task_done() if audio_chunk == b"": - last_chunk_received.set() - return - try: - await resp.write(audio_chunk) - except BrokenPipeError: - pass # race condition - - await self.subscribe_client(queue_id, client_id, audio_callback) - await last_chunk_received.wait() + # last chunk + break finally: - await self.unsubscribe_client(queue_id, client_id) + await self.unsubscribe_client(queue_id, player_id) return resp - async def subscribe_client( - self, queue_id: str, client_id: str, callback: Awaitable - ) -> None: + async def subscribe_client(self, queue_id: str, player_id: str) -> None: """Subscribe client to queue stream.""" if queue_id not in self._subscribers: - self._subscribers[queue_id] = {} - self._subscribers[queue_id][client_id] = callback - - queue = self.mass.players.get_player_queue(queue_id) - # calculate number of expected clients - expected_clients = 0 - for child_id in queue.player.group_childs: - if player := self.mass.players.get_player(child_id): - if player.powered: - expected_clients += 1 - assert expected_clients, "No clients expected for this stream" - - stream_task = self._stream_tasks.get(queue_id) - # we start the stream as soon as we've reached the expected number of clients - # TODO: add timeout guard just in case we don't reach the number of expected client - if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients: - # start the stream task - await asyncio.sleep(1) # relax - await self.start_multi_queue_stream(queue_id) + self._subscribers[queue_id] = set() + self._subscribers[queue_id].add(player_id) self.logger.debug( - "Subscribed client %s to multi queue stream %s", - client_id, - queue.queue_id, + "Subscribed player %s to multi queue stream %s", + player_id, + queue_id, ) - async def unsubscribe_client(self, queue_id: str, clientid: str): + async def unsubscribe_client(self, queue_id: str, player_id: str): """Unsubscribe client from queue stream.""" - self._subscribers[queue_id].pop(clientid, None) + if player_id in self._subscribers[queue_id]: + self._subscribers[queue_id].remove(player_id) + + self._client_queues.get(queue_id, {}).pop(player_id, None) self.logger.debug( - "Unsubscribed client %s from multi queue stream %s", clientid, queue_id + "Unsubscribed player %s from multi queue stream %s", player_id, queue_id ) if len(self._subscribers[queue_id]) == 0: # no more clients, cancel stream task @@ -231,44 +208,111 @@ class StreamController: ) task.cancel() - async def start_multi_queue_stream(self, queue_id: str) -> None: + async def start_multi_client_queue_stream( + self, queue_id: str, expected_clients: Set[str] + ) -> None: """Start the Queue stream feeding callbacks of listeners..""" - queue = self.mass.players.get_player_queue(queue_id) assert queue_id not in self._stream_tasks, "already running!" - async def queue_task(): - self.logger.debug("Multi client queue stream %s started", queue.queue_id) - try: - async for chunk in self._get_queue_stream( - queue, 44100, 16, 2, resample=True - ): - if len(self._subscribers[queue_id].values()) == 0: - # just in case of race conditions - return + self._time_started[queue_id] = time() + + # create queue for expected clients + self._client_queues.setdefault(queue_id, {}) + for child_id in expected_clients: + self._client_queues[queue_id][child_id] = asyncio.Queue(5) + + self._stream_tasks[queue_id] = asyncio.create_task( + self.__multi_client_queue_stream_runner(queue_id) + ) + + async def unpause_synced_start(): + # unpause if we got all clients (or timeout) + expected_clients = len(self._client_queues[queue_id]) + time_started = self._time_started[queue_id] + while True: + await asyncio.sleep(0.1) + if (len(self._subscribers) >= expected_clients) or ( + time() - time_started + ) > 5: await asyncio.gather( *[ - cb(chunk) - for cb in list(self._subscribers[queue_id].values()) + self.mass.players.get_player(client_id).play() + for client_id in self._subscribers.get(queue_id, {}) ] ) - finally: - self._stream_tasks.pop(queue_id, None) - # send empty chunk to inform EOF - await asyncio.gather( - *[cb(b"") for cb in list(self._subscribers[queue_id].values())] - ) - self.logger.debug("Multi client queue stream %s ended", queue.queue_id) + break - self._stream_tasks[queue_id] = asyncio.create_task(queue_task()) + self.mass.create_task(unpause_synced_start) - async def stop_multi_queue_stream(self, queue_id: str) -> None: + async def stop_multi_client_queue_stream(self, queue_id: str) -> None: """Signal a running queue stream task and its listeners to stop.""" if queue_id not in self._stream_tasks: return + # send stop to child players + await asyncio.gather( + *[ + self.mass.players.get_player(client_id).stop() + for client_id in self._subscribers.get(queue_id, {}) + ] + ) + + # stop background task if stream_task := self._stream_tasks.pop(queue_id, None): stream_task.cancel() - await stream_task + + # wait for cleanup + while len(self._subscribers.get(queue_id, {})) != 0: + await asyncio.sleep(0.1) + while len(self._client_queues.get(queue_id, {})) != 0: + await asyncio.sleep(0.1) + + async def __multi_client_queue_stream_runner(self, queue_id: str): + """Distribute audio chunks over connected clients in a multi client queue stream.""" + queue = self.mass.players.get_player_queue(queue_id) + chunks_sent = 0 + + def cleanup_client_queue(player_id: str): + if client_queue := self._client_queues[queue_id].get(player_id, None): + self.logger.debug("cleaning up child queue %s", player_id) + for _ in range(client_queue.qsize()): + client_queue.get_nowait() + client_queue.task_done() + client_queue.put_nowait(b"") + + self.logger.debug("Multi client queue stream %s started", queue.queue_id) + try: + async for chunk in self._get_queue_stream( + queue, 44100, 16, 2, resample=True + ): + chunks_sent += 1 + coros = [] + disconnected_clients = set() + for player_id, client_queue in self._client_queues[queue_id].items(): + if ( + chunks_sent >= 4 + and player_id not in self._subscribers[queue_id] + ): + # assume client did not connect or got disconnected somehow + disconnected_clients.add(player_id) + else: + coros.append(client_queue.put(chunk)) + await asyncio.gather(*coros) + for player_id in disconnected_clients: + cleanup_client_queue(player_id) + + # send empty chunk to inform EOF + await asyncio.gather( + *[cq.put(b"") for cq in self._client_queues[queue_id].values()] + ) + + finally: + # cleanup + self._stream_tasks.pop(queue_id, None) + for player_id in list(self._client_queues[queue_id].keys()): + cleanup_client_queue(player_id) + + self.logger.debug("Multi client queue stream %s ended", queue.queue_id) async def _get_queue_stream( self, @@ -455,10 +499,9 @@ class StreamController: del chunk # guard for clients buffering too much seconds_streamed = bytes_written / sample_size - seconds_per_chunk = buffer_size / sample_size - seconds_needed = int(time() - start_timestamp + seconds_per_chunk) - if (seconds_streamed) > seconds_needed: - await asyncio.sleep(seconds_per_chunk / 2) + seconds_needed = int(time() - start_timestamp) + if (seconds_streamed - seconds_needed) >= 10: + await asyncio.sleep(8) # end of the track reached # update actual duration to the queue for more accurate now playing info accurate_duration = bytes_written / sample_size diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 3402f210..6da267c1 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -274,11 +274,14 @@ class Player(ABC): player = self.mass.players.get_player(player_id) if not player or not player.powered: continue + # if player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]: + # continue queue = self.mass.players.get_player_queue(player_id) if not queue or not queue.active: continue - # match found! - return queue.queue_id + if queue.queue_id in player.current_url: + # match found! + return queue.queue_id return self.player_id def to_dict(self) -> Dict[str, Any]: diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 0e15bfba..63cc47df 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -90,7 +90,7 @@ class PlayerQueue: self._update_task: Task = None self._signal_next: bool = False self._last_player_update: int = 0 - self._stream_url: Optional[str] = None + self._stream_url: str = self.mass.players.streams.get_stream_url(self.queue_id) async def setup(self) -> None: """Handle async setup of instance.""" @@ -362,6 +362,10 @@ class PlayerQueue: async def play_index(self, index: Union[int, str]) -> None: """Play item at index (or item_id) X in queue.""" + if self.player.use_multi_stream: + await self.mass.players.streams.stop_multi_client_queue_stream( + self.queue_id + ) if not isinstance(index, int): index = self.index_by_id(index) if index is None: @@ -370,13 +374,30 @@ class PlayerQueue: return self._current_index = index self._next_start_index = index - # send stream url to player connected to this queue - if self.player.use_multi_stream and self.player.state == PlayerState.PLAYING: - await self.player.stop() - await asyncio.sleep(1) self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id) - await self.player.play_url(self._stream_url) + + if self.player.use_multi_stream: + # multi stream enabled, all child players should receive the same audio stream + # redirect command to all (powered) players + tasks = [] + expected_clients = set() + for child_id in self.player.group_childs: + if child_player := self.mass.players.get_player(child_id): + if child_player.powered: + player_url = self.mass.players.streams.get_stream_url( + self.queue_id, child_id + ) + expected_clients.add(child_id) + tasks.append(child_player.play_url(player_url)) + tasks.append(child_player.pause()) + await self.mass.players.streams.start_multi_client_queue_stream( + self.queue_id, expected_clients + ) + await asyncio.gather(*tasks) + else: + # regular (single player) request + await self.player.play_url(self._stream_url) async def move_item(self, queue_item_id: str, pos_shift: int = 1) -> None: """ @@ -532,7 +553,7 @@ class PlayerQueue: start_from_index = self._next_start_index try: next_item = self._items[start_from_index] - except IndexError as err: + except (IndexError, TypeError) as err: raise QueueEmpty() from err try: return await get_stream_details(self.mass, next_item, self.queue_id) -- 2.34.1