import asyncio
from asyncio import Task
from time import time
-from typing import AsyncGenerator, Awaitable, Callable, Dict, List
-from uuid import uuid4
+from typing import AsyncGenerator, Dict, Optional, Set
from aiohttp import web
from music_assistant.constants import EventType
self.logger = mass.logger.getChild("stream")
self._port = port
self._ip: str = get_ip()
- self._subscribers: Dict[str, Dict[str, List[Callable]]] = {}
+ self._subscribers: Dict[str, Set[str]] = {}
+ self._client_queues: Dict[str, Dict[str, asyncio.Queue]] = {}
self._stream_tasks: Dict[str, Task] = {}
+ self._time_started: Dict[str, float] = {}
- def get_stream_url(self, queue_id: str) -> str:
+ def get_stream_url(
+ self, queue_id: str, child_player: Optional[str] = None, fmt: str = "flac"
+ ) -> str:
"""Return the full stream url for the PlayerQueue Stream."""
- checksum = str(int(time()))
- queue = self.mass.players.get_player_queue(queue_id)
- if queue.player.use_multi_stream:
- return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav?checksum={checksum}"
- return f"http://{self._ip}:{self._port}/{queue_id}.flac?checksum={checksum}"
+ if child_player:
+ fmt = "wav"
+ return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{fmt}"
+ return f"http://{self._ip}:{self._port}/{queue_id}.{fmt}"
async def setup(self) -> None:
"""Async initialize of module."""
app = web.Application()
app.router.add_get(
- "/multi/{queue_id}.wav", self.serve_multi_client_queue_stream
- )
- app.router.add_get(
- "/{queue_id}.{format}", self.serve_queue_stream, allow_head=True
+ "/{queue_id}/{player_id}.{format}",
+ self.serve_multi_client_queue_stream,
)
- app.router.add_get("/{queue_id}", self.serve_queue_stream, allow_head=True)
+ app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream)
+ app.router.add_get("/{queue_id}", self.serve_queue_stream)
runner = web.AppRunner(app, access_log=None)
await runner.setup()
status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"}
)
await resp.prepare(request)
- if request.method == "HEAD":
- return
start_streamdetails = await queue.queue_stream_prepare()
output_fmt = ContentType(fmt)
async def serve_multi_client_queue_stream(self, request: web.Request):
"""Serve queue audio stream to multiple (group)clients in the raw PCM format."""
queue_id = request.match_info["queue_id"]
+ player_id = request.match_info["player_id"]
queue = self.mass.players.get_player_queue(queue_id)
- client_id = str(uuid4())
+ player = self.mass.players.get_player(player_id)
- if queue is None:
+ if queue is None or player is None:
return web.Response(status=404)
# prepare request
},
)
await resp.prepare(request)
- if request.method == "HEAD":
- return resp
# write wave header
# multi subscriber queue is (currently) limited to 44100/16 format
await resp.write(wav_header)
# start delivering audio chunks
- last_chunk_received = asyncio.Event()
+ await self.subscribe_client(queue_id, player_id)
try:
-
- async def audio_callback(audio_chunk):
+ while True:
+ audio_chunk = await self._client_queues[queue_id][player_id].get()
+ await resp.write(audio_chunk)
+ self._client_queues[queue_id][player_id].task_done()
if audio_chunk == b"":
- last_chunk_received.set()
- return
- try:
- await resp.write(audio_chunk)
- except BrokenPipeError:
- pass # race condition
-
- await self.subscribe_client(queue_id, client_id, audio_callback)
- await last_chunk_received.wait()
+ # last chunk
+ break
finally:
- await self.unsubscribe_client(queue_id, client_id)
+ await self.unsubscribe_client(queue_id, player_id)
return resp
- async def subscribe_client(
- self, queue_id: str, client_id: str, callback: Awaitable
- ) -> None:
+ async def subscribe_client(self, queue_id: str, player_id: str) -> None:
"""Subscribe client to queue stream."""
if queue_id not in self._subscribers:
- self._subscribers[queue_id] = {}
- self._subscribers[queue_id][client_id] = callback
-
- queue = self.mass.players.get_player_queue(queue_id)
- # calculate number of expected clients
- expected_clients = 0
- for child_id in queue.player.group_childs:
- if player := self.mass.players.get_player(child_id):
- if player.powered:
- expected_clients += 1
- assert expected_clients, "No clients expected for this stream"
-
- stream_task = self._stream_tasks.get(queue_id)
- # we start the stream as soon as we've reached the expected number of clients
- # TODO: add timeout guard just in case we don't reach the number of expected client
- if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients:
- # start the stream task
- await asyncio.sleep(1) # relax
- await self.start_multi_queue_stream(queue_id)
+ self._subscribers[queue_id] = set()
+ self._subscribers[queue_id].add(player_id)
self.logger.debug(
- "Subscribed client %s to multi queue stream %s",
- client_id,
- queue.queue_id,
+ "Subscribed player %s to multi queue stream %s",
+ player_id,
+ queue_id,
)
- async def unsubscribe_client(self, queue_id: str, clientid: str):
+ async def unsubscribe_client(self, queue_id: str, player_id: str):
"""Unsubscribe client from queue stream."""
- self._subscribers[queue_id].pop(clientid, None)
+ if player_id in self._subscribers[queue_id]:
+ self._subscribers[queue_id].remove(player_id)
+
+ self._client_queues.get(queue_id, {}).pop(player_id, None)
self.logger.debug(
- "Unsubscribed client %s from multi queue stream %s", clientid, queue_id
+ "Unsubscribed player %s from multi queue stream %s", player_id, queue_id
)
if len(self._subscribers[queue_id]) == 0:
# no more clients, cancel stream task
)
task.cancel()
- async def start_multi_queue_stream(self, queue_id: str) -> None:
+ async def start_multi_client_queue_stream(
+ self, queue_id: str, expected_clients: Set[str]
+ ) -> None:
"""Start the Queue stream feeding callbacks of listeners.."""
- queue = self.mass.players.get_player_queue(queue_id)
assert queue_id not in self._stream_tasks, "already running!"
- async def queue_task():
- self.logger.debug("Multi client queue stream %s started", queue.queue_id)
- try:
- async for chunk in self._get_queue_stream(
- queue, 44100, 16, 2, resample=True
- ):
- if len(self._subscribers[queue_id].values()) == 0:
- # just in case of race conditions
- return
+ self._time_started[queue_id] = time()
+
+ # create queue for expected clients
+ self._client_queues.setdefault(queue_id, {})
+ for child_id in expected_clients:
+ self._client_queues[queue_id][child_id] = asyncio.Queue(5)
+
+ self._stream_tasks[queue_id] = asyncio.create_task(
+ self.__multi_client_queue_stream_runner(queue_id)
+ )
+
+ async def unpause_synced_start():
+ # unpause if we got all clients (or timeout)
+ expected_clients = len(self._client_queues[queue_id])
+ time_started = self._time_started[queue_id]
+ while True:
+ await asyncio.sleep(0.1)
+ if (len(self._subscribers) >= expected_clients) or (
+ time() - time_started
+ ) > 5:
await asyncio.gather(
*[
- cb(chunk)
- for cb in list(self._subscribers[queue_id].values())
+ self.mass.players.get_player(client_id).play()
+ for client_id in self._subscribers.get(queue_id, {})
]
)
- finally:
- self._stream_tasks.pop(queue_id, None)
- # send empty chunk to inform EOF
- await asyncio.gather(
- *[cb(b"") for cb in list(self._subscribers[queue_id].values())]
- )
- self.logger.debug("Multi client queue stream %s ended", queue.queue_id)
+ break
- self._stream_tasks[queue_id] = asyncio.create_task(queue_task())
+ self.mass.create_task(unpause_synced_start)
- async def stop_multi_queue_stream(self, queue_id: str) -> None:
+ async def stop_multi_client_queue_stream(self, queue_id: str) -> None:
"""Signal a running queue stream task and its listeners to stop."""
if queue_id not in self._stream_tasks:
return
+ # send stop to child players
+ await asyncio.gather(
+ *[
+ self.mass.players.get_player(client_id).stop()
+ for client_id in self._subscribers.get(queue_id, {})
+ ]
+ )
+
+ # stop background task
if stream_task := self._stream_tasks.pop(queue_id, None):
stream_task.cancel()
- await stream_task
+
+ # wait for cleanup
+ while len(self._subscribers.get(queue_id, {})) != 0:
+ await asyncio.sleep(0.1)
+ while len(self._client_queues.get(queue_id, {})) != 0:
+ await asyncio.sleep(0.1)
+
+ async def __multi_client_queue_stream_runner(self, queue_id: str):
+ """Distribute audio chunks over connected clients in a multi client queue stream."""
+ queue = self.mass.players.get_player_queue(queue_id)
+ chunks_sent = 0
+
+ def cleanup_client_queue(player_id: str):
+ if client_queue := self._client_queues[queue_id].get(player_id, None):
+ self.logger.debug("cleaning up child queue %s", player_id)
+ for _ in range(client_queue.qsize()):
+ client_queue.get_nowait()
+ client_queue.task_done()
+ client_queue.put_nowait(b"")
+
+ self.logger.debug("Multi client queue stream %s started", queue.queue_id)
+ try:
+ async for chunk in self._get_queue_stream(
+ queue, 44100, 16, 2, resample=True
+ ):
+ chunks_sent += 1
+ coros = []
+ disconnected_clients = set()
+ for player_id, client_queue in self._client_queues[queue_id].items():
+ if (
+ chunks_sent >= 4
+ and player_id not in self._subscribers[queue_id]
+ ):
+ # assume client did not connect or got disconnected somehow
+ disconnected_clients.add(player_id)
+ else:
+ coros.append(client_queue.put(chunk))
+ await asyncio.gather(*coros)
+ for player_id in disconnected_clients:
+ cleanup_client_queue(player_id)
+
+ # send empty chunk to inform EOF
+ await asyncio.gather(
+ *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
+ )
+
+ finally:
+ # cleanup
+ self._stream_tasks.pop(queue_id, None)
+ for player_id in list(self._client_queues[queue_id].keys()):
+ cleanup_client_queue(player_id)
+
+ self.logger.debug("Multi client queue stream %s ended", queue.queue_id)
async def _get_queue_stream(
self,
del chunk
# guard for clients buffering too much
seconds_streamed = bytes_written / sample_size
- seconds_per_chunk = buffer_size / sample_size
- seconds_needed = int(time() - start_timestamp + seconds_per_chunk)
- if (seconds_streamed) > seconds_needed:
- await asyncio.sleep(seconds_per_chunk / 2)
+ seconds_needed = int(time() - start_timestamp)
+ if (seconds_streamed - seconds_needed) >= 10:
+ await asyncio.sleep(8)
# end of the track reached
# update actual duration to the queue for more accurate now playing info
accurate_duration = bytes_written / sample_size
self._update_task: Task = None
self._signal_next: bool = False
self._last_player_update: int = 0
- self._stream_url: Optional[str] = None
+ self._stream_url: str = self.mass.players.streams.get_stream_url(self.queue_id)
async def setup(self) -> None:
"""Handle async setup of instance."""
async def play_index(self, index: Union[int, str]) -> None:
"""Play item at index (or item_id) X in queue."""
+ if self.player.use_multi_stream:
+ await self.mass.players.streams.stop_multi_client_queue_stream(
+ self.queue_id
+ )
if not isinstance(index, int):
index = self.index_by_id(index)
if index is None:
return
self._current_index = index
self._next_start_index = index
-
# send stream url to player connected to this queue
- if self.player.use_multi_stream and self.player.state == PlayerState.PLAYING:
- await self.player.stop()
- await asyncio.sleep(1)
self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id)
- await self.player.play_url(self._stream_url)
+
+ if self.player.use_multi_stream:
+ # multi stream enabled, all child players should receive the same audio stream
+ # redirect command to all (powered) players
+ tasks = []
+ expected_clients = set()
+ for child_id in self.player.group_childs:
+ if child_player := self.mass.players.get_player(child_id):
+ if child_player.powered:
+ player_url = self.mass.players.streams.get_stream_url(
+ self.queue_id, child_id
+ )
+ expected_clients.add(child_id)
+ tasks.append(child_player.play_url(player_url))
+ tasks.append(child_player.pause())
+ await self.mass.players.streams.start_multi_client_queue_stream(
+ self.queue_id, expected_clients
+ )
+ await asyncio.gather(*tasks)
+ else:
+ # regular (single player) request
+ await self.player.play_url(self._stream_url)
async def move_item(self, queue_item_id: str, pos_shift: int = 1) -> None:
"""
start_from_index = self._next_start_index
try:
next_item = self._items[start_from_index]
- except IndexError as err:
+ except (IndexError, TypeError) as err:
raise QueueEmpty() from err
try:
return await get_stream_details(self.mass, next_item, self.queue_id)