import asyncio
from asyncio import Task
-from dataclasses import dataclass
from time import time
from typing import AsyncGenerator, Awaitable, Callable, Dict, List
from uuid import uuid4
)
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import get_ip
+from music_assistant.helpers.util import create_task, get_ip
from music_assistant.models.errors import MediaNotFoundError
from music_assistant.models.media_items import ContentType
from music_assistant.models.player_queue import PlayerQueue
-@dataclass(frozen=True)
-class PCMArgs:
- """Specify raw pcm audio."""
-
- sample_rate: int
- bit_depth: int
- channels: int
-
-
class StreamController:
"""Controller to stream audio to players."""
self._ip: str = get_ip()
self._subscribers: Dict[str, Dict[str, List[Callable]]] = {}
self._stream_tasks: Dict[str, Task] = {}
- self._pcmargs: Dict[str, PCMArgs] = {}
def get_stream_url(self, queue_id: str) -> str:
"""Return the full stream url for the PlayerQueue Stream."""
+ queue = self.mass.players.get_player_queue(queue_id)
+ if queue.player.use_multi_stream:
+ return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav"
return f"http://{self._ip}:{self._port}/{queue_id}.flac"
async def setup(self) -> None:
"""Async initialize of module."""
app = web.Application()
- app.router.add_get("/{queue_id}.wav", self.serve_stream_client_pcm)
- app.router.add_get("/{queue_id}.{format}", self.serve_stream_client)
- app.router.add_get("/{queue_id}", self.serve_stream_client)
+ app.router.add_get(
+ "/multi/{queue_id}.wav", self.serve_multi_client_queue_stream
+ )
+ app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream)
+ app.router.add_get("/{queue_id}", self.serve_queue_stream)
runner = web.AppRunner(app, access_log=None)
await runner.setup()
self.logger.info("Started stream server on port %s", self._port)
- async def serve_stream_client(self, request: web.Request):
- """Serve queue audio stream to client (encoded to FLAC or MP3)."""
+ async def serve_queue_stream(self, request: web.Request):
+ """Serve queue audio stream to a single player (encoded to fileformat of choice)."""
queue_id = request.match_info["queue_id"]
- clientid = f'{request.remote}_{request.query.get("playerid", str(uuid4()))}'
fmt = request.match_info.get("format", "flac")
+ queue = self.mass.players.get_player_queue(queue_id)
- if self.mass.players.get_player_queue(queue_id) is None:
+ if queue is None:
return web.Response(status=404)
# prepare request
)
await resp.prepare(request)
- pcmargs = await self._get_queue_stream_pcm_args(queue_id)
+ start_streamdetails = await queue.queue_stream_prepare()
output_fmt = ContentType(fmt)
sox_args = await get_sox_args_for_pcm_stream(
- pcmargs.sample_rate,
- pcmargs.bit_depth,
- pcmargs.channels,
+ start_streamdetails.sample_rate,
+ start_streamdetails.bit_depth,
+ start_streamdetails.channels,
output_format=output_fmt,
)
- try:
- # get the raw pcm bytes from the queue stream and on the fly encode as flac
- # send the flac endoded stream to the subscribers.
- async with AsyncProcess(sox_args, True) as sox_proc:
-
- async def reader():
- # task that reads flac endoded chunks from the subprocess
- chunksize = 32000 if output_fmt == ContentType.MP3 else 256000
- async for audio_chunk in sox_proc.iterate_chunks(chunksize):
- await resp.write(audio_chunk)
-
- # feed raw pcm chunks into sox/ffmpeg to encode to flac
- async def audio_callback(audio_chunk):
- if audio_chunk == b"":
- sox_proc.write_eof()
+ # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format
+ # send the compressed/endoded stream to the client.
+ async with AsyncProcess(sox_args, True) as sox_proc:
+
+ async def writer():
+ # task that sends the raw pcm audio to the sox/ffmpeg process
+ async for audio_chunk in self._get_queue_stream(
+ queue,
+ sample_rate=start_streamdetails.sample_rate,
+ bit_depth=start_streamdetails.bit_depth,
+ channels=start_streamdetails.channels,
+ ):
+ if sox_proc.closed:
return
await sox_proc.write(audio_chunk)
+ # write eof when last packet is received
+ sox_proc.write_eof()
- # wait for the output task to complete
- await self.subscribe(queue_id, clientid, audio_callback)
- await reader()
+ create_task(writer)
+
+ # read bytes from final output
+ chunksize = 32000 if output_fmt == ContentType.MP3 else 90000
+ async for audio_chunk in sox_proc.iterate_chunks(chunksize):
+ await resp.write(audio_chunk)
- finally:
- await self.unsubscribe(queue_id, clientid)
return resp
- async def serve_stream_client_pcm(self, request: web.Request):
- """Serve queue audio stream to client in the raw PCM format."""
+ async def serve_multi_client_queue_stream(self, request: web.Request):
+ """Serve queue audio stream to multiple (group)clients in the raw PCM format."""
queue_id = request.match_info["queue_id"]
queue = self.mass.players.get_player_queue(queue_id)
- clientid = f'{request.remote}_{request.query.get("playerid", str(uuid4()))}'
+ client_id = str(uuid4())
if queue is None:
return web.Response(status=404)
# prepare request
- pcmargs = await self._get_queue_stream_pcm_args(queue_id, 32)
- fmt = f"x-wav;codec=pcm;rate={pcmargs.sample_rate};bitrate={pcmargs.bit_depth};channels={pcmargs.channels}"
resp = web.StreamResponse(
status=200,
reason="OK",
- headers={"Content-Type": f"audio/{fmt}"},
+ headers={
+ "Content-Type": "audio/x-wav;codec=pcm;rate=44100;bitrate=16;channels=2"
+ },
)
await resp.prepare(request)
# write wave header
- wav_header = create_wave_header(
- pcmargs.sample_rate,
- pcmargs.channels,
- pcmargs.bit_depth,
- )
+ # multi subscriber queue is (currently) limited to 44100/16 format
+ wav_header = create_wave_header(44100, 2, 16)
await resp.write(wav_header)
# start delivering audio chunks
except BrokenPipeError:
pass # race condition
- await self.subscribe(queue_id, clientid, audio_callback)
+ await self.subscribe(queue_id, client_id, audio_callback)
await last_chunk_received.wait()
finally:
- await self.unsubscribe(queue_id, clientid)
+ await self.unsubscribe(queue_id, client_id)
return resp
async def subscribe(
- self, queue_id: str, clientid: str, callback: Awaitable
+ self, queue_id: str, client_id: str, callback: Awaitable
) -> None:
"""Subscribe client to queue stream."""
- self._subscribers.setdefault(queue_id, {})
- if queue_id in self._subscribers[queue_id]:
- # client is already subscribed ?
- await self.unsubscribe(queue_id, clientid)
- self._subscribers[queue_id][clientid] = callback
+ if queue_id not in self._subscribers:
+ self._subscribers[queue_id] = {}
+ self._subscribers[queue_id][client_id] = callback
+
+ queue = self.mass.players.get_player_queue(queue_id)
+ # calculate number of expected clients
+ expected_clients = 0
+ for child_id in queue.player.group_childs:
+ if player := self.mass.players.get_player(child_id):
+ if player.powered:
+ expected_clients += 1
+ assert expected_clients, "No clients expected for this stream"
+
stream_task = self._stream_tasks.get(queue_id)
- if not stream_task or stream_task.cancelled():
- # first connect, start the stream task
- task = asyncio.create_task(self.start_queue_stream(queue_id))
+ if stream_task is not None:
+ # a new client connected while we're already streaming, tell the queue to restart
+ stream_task.cancel()
+ await queue.resume()
+ return
+ # we start the stream as soon as we've reached the expected number of clients
+ # TODO: add timeout guard just in case we don't reach the number of expected client
+ if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients:
+ # start the stream task
+ self._stream_tasks[queue_id] = task = asyncio.create_task(
+ self.start_multi_queue_stream(queue_id)
+ )
+ self.logger.debug("Multi client queue stream %s started", queue.queue_id)
def task_done_callback(*args, **kwargs):
self._stream_tasks.pop(queue_id, None)
task.add_done_callback(task_done_callback)
- self._stream_tasks[queue_id] = task
- self.logger.debug("Subscribed client %s to queue stream %s", clientid, queue_id)
+ self.logger.debug(
+ "Subscribed client %s to multi queue stream %s",
+ client_id,
+ queue.queue_id,
+ )
+ return client_id
async def unsubscribe(self, queue_id: str, clientid: str):
"""Unsubscribe client from queue stream."""
self._subscribers[queue_id].pop(clientid, None)
self.logger.debug(
- "Unsubscribed client %s from queue stream %s", clientid, queue_id
+ "Unsubscribed client %s from multi queue stream %s", clientid, queue_id
)
if len(self._subscribers[queue_id]) == 0:
# no more clients, cancel stream task
- self.logger.debug(
- "Aborted queue stream %s due to no more clients", queue_id
- )
if task := self._stream_tasks.pop(queue_id, None):
+ self.logger.debug(
+ "Aborted multi queue stream %s due to no more clients", queue_id
+ )
task.cancel()
- self._pcmargs.pop(queue_id, None)
- async def start_queue_stream(self, queue_id: str) -> None:
+ async def start_multi_queue_stream(self, queue_id: str) -> None:
"""Start the Queue stream feeding callbacks of listeners.."""
queue = self.mass.players.get_player_queue(queue_id)
- pcmargs = await self._get_queue_stream_pcm_args(queue_id)
-
- self.logger.info(
- "Starting Queue stream for Queue %s with args: %s", queue_id, pcmargs
- )
- async for chunk in self._get_queue_stream(
- queue,
- pcmargs.sample_rate,
- pcmargs.bit_depth,
- pcmargs.channels,
- ):
+ async for chunk in self._get_queue_stream(queue, 44100, 16, 2, resample=True):
if len(self._subscribers[queue_id].values()) == 0:
- self.logger.info("Queue stream for Queue %s aborted", queue_id)
+ # just in case of race conditions
return
await asyncio.gather(
*[cb(chunk) for cb in list(self._subscribers[queue_id].values())]
)
- self.logger.info("Queue stream for Queue %s finished.", queue_id)
# send empty chunk to inform EOF
await asyncio.gather(
*[cb(b"") for cb in list(self._subscribers[queue_id].values())]
)
- async def _get_queue_stream_pcm_args(
- self, queue_id: str, forced_bit_depth: int = None
- ) -> PCMArgs:
- """Return the current/ext PCM args for the queue stream."""
- if queue_id in self._pcmargs:
- return self._pcmargs[queue_id]
- queue = self.mass.players.get_player_queue(queue_id)
- next_streamdetails = await queue.queue_stream_prepare()
- pcmargs = PCMArgs(
- sample_rate=min(next_streamdetails.sample_rate, queue.max_sample_rate),
- bit_depth=forced_bit_depth or next_streamdetails.bit_depth,
- channels=2,
- )
- self._pcmargs[queue_id] = pcmargs
- return pcmargs
-
async def _get_queue_stream(
- self, queue: PlayerQueue, sample_rate: int, bit_depth: int, channels: int = 2
+ self,
+ queue: PlayerQueue,
+ sample_rate: int,
+ bit_depth: int,
+ channels: int = 2,
+ resample: bool = False,
) -> AsyncGenerator[None, bytes]:
"""Stream the PlayerQueue's tracks as constant feed of PCM raw audio."""
last_fadeout_data = b""
track_count = 0
start_timestamp = time()
+ pcm_fmt = ContentType.from_bit_depth(bit_depth)
+ self.logger.info(
+ "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)",
+ queue.player.name,
+ pcm_fmt,
+ sample_rate,
+ )
+
# stream queue tracks one by one
while True:
# get the (next) track in queue
self.logger.warning("Skip track due to missing streamdetails")
continue
- # get the PCM samplerate/bitrate
- if streamdetails.bit_depth > bit_depth:
+ # check the PCM samplerate/bitrate
+ if not resample and streamdetails.bit_depth > bit_depth:
await queue.queue_stream_signal_next()
self.logger.debug("Abort queue stream due to bit depth mismatch")
await queue.queue_stream_signal_next()
break
if (
- streamdetails.sample_rate > sample_rate
+ not resample
+ and streamdetails.sample_rate > sample_rate
and streamdetails.sample_rate <= queue.max_sample_rate
):
self.logger.debug("Abort queue stream due to sample rate mismatch")
await queue.queue_stream_signal_next()
break
- pcm_fmt = ContentType.from_bit_depth(bit_depth)
sample_size = int(sample_rate * (bit_depth / 8) * channels) # 1 second
buffer_size = sample_size * (
queue.crossfade_duration or 1
) # 1...10 seconds
self.logger.debug(
- "Start Streaming queue track: %s (%s) for player %s - PCM format: %s - rate: %s",
+ "Start Streaming queue track: %s (%s) for queue %s",
queue_track.item_id,
queue_track.name,
queue.player.name,
- pcm_fmt.value,
- sample_rate,
)
fade_in_part = b""
cur_chunk = 0
# end of queue reached, pass last fadeout bits to final output
yield last_fadeout_data
# END OF QUEUE STREAM
+ self.logger.info("Queue stream for Queue %s finished.", queue.queue_id)