from music_assistant.helpers.audio import (
check_audio_support,
- create_wave_header,
crossfade_pcm_parts,
fadein_pcm_part,
get_chunksize,
get_media_stream,
get_preview_stream,
+ get_silence,
get_stream_details,
strip_silence,
)
app = web.Application()
app.router.add_get("/preview", self.serve_preview)
+ app.router.add_get("/silence.{fmt}", self.serve_silence)
app.router.add_get("/{queue_id}/{control}", self.serve_control)
- app.router.add_get("/{stream_id}.{format}", self.serve_queue_stream)
+ app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream)
runner = web.AppRunner(app, access_log=None)
await runner.setup()
await resp.prepare(request)
if request.method == "GET":
# service 60 seconds of silence while player is processing request
- await resp.write(create_wave_header(duration=60))
- for _ in range(0, 60):
- await resp.write(b"\0" * 1764000)
+ async for chunk in get_silence(60, ContentType.WAV):
+ await resp.write(chunk)
return resp
async def serve_preview(self, request: web.Request):
await resp.write(chunk)
return resp
+ @staticmethod
+ async def serve_silence(request: web.Request):
+ """Serve some nice silence."""
+ duration = int(request.query.get("duration", 60))
+ fmt = ContentType.try_parse(request.match_info["fmt"])
+
+ resp = web.StreamResponse(
+ status=200, reason="OK", headers={"Content-Type": f"audio/{fmt.value}"}
+ )
+ await resp.prepare(request)
+ if request.method == "GET":
+ async for chunk in get_silence(duration, fmt):
+ await resp.write(chunk)
+ return resp
+
async def serve_queue_stream(self, request: web.Request):
"""Serve queue audio stream to a single player."""
+ self.logger.debug(
+ "Got %s request to %s from %s\nheaders: %s\n",
+ request.method,
+ request.path,
+ request.remote,
+ request.headers,
+ )
stream_id = request.match_info["stream_id"]
queue_stream = self.queue_streams.get(stream_id)
headers = {
"Content-Type": f"audio/{queue_stream.output_format.value}",
"transferMode.dlna.org": "Streaming",
- "Connection": "Close",
+ "icy-name": "Streaming from Music Assistant",
+ "icy-pub": "0",
+ "Cache-Control": "no-cache",
+ "icy-metaint": str(queue_stream.chunk_size),
"contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000",
}
+
resp = web.StreamResponse(headers=headers)
await resp.prepare(request)
- if request.method == "GET":
- client_id = request.remote
- await queue_stream.subscribe(client_id, resp.write)
+ if request.method != "GET":
+ # do not start stream on HEAD request
+ return resp
+
+ client_id = request.remote
+ enable_icy = request.headers.get("Icy-MetaData", "") == "1"
+
+ # regular streaming - each chunk is sent to the callback here
+ # this chunk is already encoded to the requested audio format of choice.
+ # optional ICY metadata can be sent to the client if it supports that
+ async def audio_callback(chunk: bytes) -> None:
+ """Call when a new audio chunk arrives."""
+ # write audio
+ await resp.write(chunk)
+
+ # ICY metadata support
+ if not enable_icy:
+ return
+
+ # if icy metadata is enabled, send the icy metadata after the chunk
+ item_in_buf = queue_stream.queue.get_item(queue_stream.index_in_buffer)
+ if item_in_buf and item_in_buf.name:
+ title = item_in_buf.name
+ else:
+ title = "Music Assistant"
+ image = item_in_buf.image or ""
+ metadata = f"StreamTitle='{title}';StreamUrl='&picture={image}';".encode()
+ while len(metadata) % 16 != 0:
+ metadata += b"\x00"
+ length = len(metadata)
+ length_b = chr(int(length / 16)).encode()
+ await resp.write(length_b + metadata)
+
+ await queue_stream.subscribe(client_id, audio_callback)
+ await resp.write_eof()
return resp
self.all_clients_connected = asyncio.Event()
self.index_in_buffer = start_index
self.signal_next: bool = False
+ self.chunk_size = get_chunksize(output_format)
self._runner_task: Optional[asyncio.Task] = None
+ self._prev_chunks: Dict[str, bytes] = {}
if autostart:
self.mass.create_task(self.start())
self._runner_task = None
self.connected_clients = {}
+ self._prev_chunks = {}
# run garbage collection manually due to the high number of
# processed bytes blocks
self.logger.debug("client connected: %s", client_id)
if len(self.connected_clients) == self.expected_clients:
self.all_clients_connected.set()
+
+ if client_id in self._prev_chunks:
+ self.logger.warning(
+ "Reconnect of player %s detected, playback may be disturbed", client_id
+ )
+ await callback(self._prev_chunks[client_id])
try:
await self.done.wait()
finally:
self.connected_clients.pop(client_id, None)
self.logger.debug("client disconnected: %s", client_id)
- if len(self.connected_clients) == 0:
- # no more clients, perform cleanup
- await self.stop()
+ await self._check_stop()
async def _queue_stream_runner(self) -> None:
"""Distribute audio chunks over connected client queues."""
str(self.pcm_sample_rate),
"-i",
"-",
+ # add metadata
+ "-metadata",
+ "title=Streaming from Music Assistant",
# output args
"-f",
self.output_format.value,
]
# get the raw pcm bytes from the queue stream and on the fly encode to wanted format
# send the compressed/encoded stream to the client(s).
- chunk_size = get_chunksize(self.output_format)
sample_size = int(
self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels
)
- async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc:
+ async with AsyncProcess(ffmpeg_args, True, self.chunk_size) as ffmpeg_proc:
async def writer():
"""Task that sends the raw pcm audio to the ffmpeg process."""
self.streaming_started = time()
# Read bytes from final output and send chunk to child callback.
+ chunk_num = 0
async for chunk in ffmpeg_proc.iterate_chunks():
+ chunk_num += 1
+
if len(self.connected_clients) == 0:
# no more clients
- self.done.set()
- self.logger.debug("Abort: all clients diconnected.")
- return
+ if await self._check_stop():
+ return
for client_id in set(self.connected_clients.keys()):
+ self._prev_chunks[client_id] = chunk
try:
callback = self.connected_clients[client_id]
await callback(chunk)
):
self.connected_clients.pop(client_id, None)
- del chunk
+ # back off a bit after first chunk to handle reconnecting clients (e.g. kodi)
+ if chunk_num == 1:
+ await asyncio.sleep(0.5)
# complete queue streamed
if self.signal_next:
del last_fadeout_data
# END OF QUEUE STREAM
self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name)
+
+ async def _check_stop(self) -> bool:
+ """Schedule stop of queue stream."""
+ # Stop this queue stream when no clients (re)connected within 5 seconds
+ for _ in range(0, 10):
+ if len(self.connected_clients) > 0:
+ return False
+ await asyncio.sleep(0.5)
+ self.mass.create_task(self.stop())
+ return True