from music_assistant.constants import EventType
from music_assistant.helpers.audio import (
check_audio_support,
- create_wave_header,
crossfade_pcm_parts,
get_media_stream,
get_sox_args_for_pcm_stream,
) -> str:
"""Return the full stream url for the PlayerQueue Stream."""
if child_player:
- fmt = "wav"
return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{fmt}"
return f"http://{self._ip}:{self._port}/{queue_id}.{fmt}"
"""Serve queue audio stream to multiple (group)clients in the raw PCM format."""
queue_id = request.match_info["queue_id"]
player_id = request.match_info["player_id"]
+ fmt = request.match_info.get("format", "flac")
queue = self.mass.players.get_player_queue(queue_id)
player = self.mass.players.get_player(player_id)
resp = web.StreamResponse(
status=200,
reason="OK",
- headers={
- "Content-Type": "audio/x-wav;codec=pcm;rate=44100;bitrate=16;channels=2"
- },
+ headers={"Content-Type": fmt},
)
await resp.prepare(request)
- # write wave header
- # multi subscriber queue is (currently) limited to 44100/16 format
- wav_header = create_wave_header(44100, 2, 16)
- await resp.write(wav_header)
-
# start delivering audio chunks
await self.subscribe_client(queue_id, player_id)
try:
task.cancel()
async def start_multi_client_queue_stream(
- self, queue_id: str, expected_clients: Set[str]
+ self, queue_id: str, expected_clients: Set[str], output_fmt: ContentType
) -> None:
"""Start the Queue stream feeding callbacks of listeners.."""
assert queue_id not in self._stream_tasks, "already running!"
# create queue for expected clients
self._client_queues.setdefault(queue_id, {})
for child_id in expected_clients:
- self._client_queues[queue_id][child_id] = asyncio.Queue(5)
+ self._client_queues[queue_id][child_id] = asyncio.Queue(10)
self._stream_tasks[queue_id] = asyncio.create_task(
- self.__multi_client_queue_stream_runner(queue_id)
+ self.__multi_client_queue_stream_runner(queue_id, output_fmt)
)
async def unpause_synced_start():
- # unpause if we got all clients (or timeout)
+ # unpause if we got all clients (or timeout) for (more or less) synced start
expected_clients = len(self._client_queues[queue_id])
time_started = self._time_started[queue_id]
while True:
while len(self._client_queues.get(queue_id, {})) != 0:
await asyncio.sleep(0.1)
- async def __multi_client_queue_stream_runner(self, queue_id: str):
+ async def __multi_client_queue_stream_runner(
+ self, queue_id: str, output_fmt: ContentType
+ ):
"""Distribute audio chunks over connected clients in a multi client queue stream."""
queue = self.mass.players.get_player_queue(queue_id)
- chunks_sent = 0
def cleanup_client_queue(player_id: str):
if client_queue := self._client_queues[queue_id].get(player_id, None):
client_queue.task_done()
client_queue.put_nowait(b"")
+ start_streamdetails = await queue.queue_stream_prepare()
+ sox_args = await get_sox_args_for_pcm_stream(
+ start_streamdetails.sample_rate,
+ start_streamdetails.bit_depth,
+ start_streamdetails.channels,
+ output_format=output_fmt,
+ )
self.logger.debug("Multi client queue stream %s started", queue.queue_id)
try:
- async for chunk in self._get_queue_stream(
- queue, 44100, 16, 2, resample=True
- ):
- chunks_sent += 1
- coros = []
- disconnected_clients = set()
- for player_id, client_queue in self._client_queues[queue_id].items():
- if (
- chunks_sent >= 4
- and player_id not in self._subscribers[queue_id]
- ):
- # assume client did not connect or got disconnected somehow
- disconnected_clients.add(player_id)
- else:
- coros.append(client_queue.put(chunk))
- await asyncio.gather(*coros)
- for player_id in disconnected_clients:
- cleanup_client_queue(player_id)
+ # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format
+ # send the compressed/endoded stream to the client.
+ async with AsyncProcess(sox_args, True) as sox_proc:
+
+ async def writer():
+ # task that sends the raw pcm audio to the sox/ffmpeg process
+ async for audio_chunk in self._get_queue_stream(
+ queue,
+ sample_rate=start_streamdetails.sample_rate,
+ bit_depth=start_streamdetails.bit_depth,
+ channels=start_streamdetails.channels,
+ ):
+ if sox_proc.closed:
+ return
+ await sox_proc.write(audio_chunk)
+ # write eof when last packet is received
+ sox_proc.write_eof()
+
+ async def reader():
+ # read bytes from final output
+ chunks_sent = 0
+ chunksize = 32000 if output_fmt == ContentType.MP3 else 90000
+ async for chunk in sox_proc.iterate_chunks(chunksize):
+ chunks_sent += 1
+ coros = []
+ for player_id in list(self._client_queues[queue_id].keys()):
+ if (
+ chunks_sent >= 20
+ and player_id not in self._subscribers[queue_id]
+ ):
+ # assume client did not connect or got disconnected somehow
+ cleanup_client_queue(player_id)
+ self._client_queues[queue_id].pop(player_id, None)
+ else:
+ coros.append(
+ self._client_queues[queue_id][player_id].put(chunk)
+ )
+ await asyncio.gather(*coros)
+
+ await asyncio.gather(*[writer(), reader()])
# send empty chunk to inform EOF
await asyncio.gather(
*[cq.put(b"") for cq in self._client_queues[queue_id].values()]