Fix playback on Kodi/OSMC (#381)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 22 Jun 2022 20:19:12 +0000 (22:19 +0200)
committerGitHub <noreply@github.com>
Wed, 22 Jun 2022 20:19:12 +0000 (22:19 +0200)
- support sending icy metadata
- fix playback on players that do multiple get requests

music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/models/player_queue.py

index f076719794805dc64367d5a4b7392c1d16c3e4d5..d3567dc8c772d5b9c90379761c78a49ed24c10b9 100644 (file)
@@ -13,12 +13,12 @@ from aiohttp import web
 
 from music_assistant.helpers.audio import (
     check_audio_support,
-    create_wave_header,
     crossfade_pcm_parts,
     fadein_pcm_part,
     get_chunksize,
     get_media_stream,
     get_preview_stream,
+    get_silence,
     get_stream_details,
     strip_silence,
 )
@@ -81,8 +81,9 @@ class StreamsController:
         app = web.Application()
 
         app.router.add_get("/preview", self.serve_preview)
+        app.router.add_get("/silence.{fmt}", self.serve_silence)
         app.router.add_get("/{queue_id}/{control}", self.serve_control)
-        app.router.add_get("/{stream_id}.{format}", self.serve_queue_stream)
+        app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream)
 
         runner = web.AppRunner(app, access_log=None)
         await runner.setup()
@@ -127,9 +128,8 @@ class StreamsController:
         await resp.prepare(request)
         if request.method == "GET":
             # service 60 seconds of silence while player is processing request
-            await resp.write(create_wave_header(duration=60))
-            for _ in range(0, 60):
-                await resp.write(b"\0" * 1764000)
+            async for chunk in get_silence(60, ContentType.WAV):
+                await resp.write(chunk)
         return resp
 
     async def serve_preview(self, request: web.Request):
@@ -144,8 +144,30 @@ class StreamsController:
             await resp.write(chunk)
         return resp
 
+    @staticmethod
+    async def serve_silence(request: web.Request):
+        """Serve some nice silence."""
+        duration = int(request.query.get("duration", 60))
+        fmt = ContentType.try_parse(request.match_info["fmt"])
+
+        resp = web.StreamResponse(
+            status=200, reason="OK", headers={"Content-Type": f"audio/{fmt.value}"}
+        )
+        await resp.prepare(request)
+        if request.method == "GET":
+            async for chunk in get_silence(duration, fmt):
+                await resp.write(chunk)
+        return resp
+
     async def serve_queue_stream(self, request: web.Request):
         """Serve queue audio stream to a single player."""
+        self.logger.debug(
+            "Got %s request to %s from %s\nheaders: %s\n",
+            request.method,
+            request.path,
+            request.remote,
+            request.headers,
+        )
         stream_id = request.match_info["stream_id"]
         queue_stream = self.queue_streams.get(stream_id)
 
@@ -157,15 +179,51 @@ class StreamsController:
         headers = {
             "Content-Type": f"audio/{queue_stream.output_format.value}",
             "transferMode.dlna.org": "Streaming",
-            "Connection": "Close",
+            "icy-name": "Streaming from Music Assistant",
+            "icy-pub": "0",
+            "Cache-Control": "no-cache",
+            "icy-metaint": str(queue_stream.chunk_size),
             "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000",
         }
+
         resp = web.StreamResponse(headers=headers)
         await resp.prepare(request)
 
-        if request.method == "GET":
-            client_id = request.remote
-            await queue_stream.subscribe(client_id, resp.write)
+        if request.method != "GET":
+            # do not start stream on HEAD request
+            return resp
+
+        client_id = request.remote
+        enable_icy = request.headers.get("Icy-MetaData", "") == "1"
+
+        # regular streaming - each chunk is sent to the callback here
+        # this chunk is already encoded to the requested audio format of choice.
+        # optional ICY metadata can be sent to the client if it supports that
+        async def audio_callback(chunk: bytes) -> None:
+            """Call when a new audio chunk arrives."""
+            # write audio
+            await resp.write(chunk)
+
+            # ICY metadata support
+            if not enable_icy:
+                return
+
+            # if icy metadata is enabled, send the icy metadata after the chunk
+            item_in_buf = queue_stream.queue.get_item(queue_stream.index_in_buffer)
+            if item_in_buf and item_in_buf.name:
+                title = item_in_buf.name
+            else:
+                title = "Music Assistant"
+            image = item_in_buf.image or ""
+            metadata = f"StreamTitle='{title}';StreamUrl='&picture={image}';".encode()
+            while len(metadata) % 16 != 0:
+                metadata += b"\x00"
+            length = len(metadata)
+            length_b = chr(int(length / 16)).encode()
+            await resp.write(length_b + metadata)
+
+        await queue_stream.subscribe(client_id, audio_callback)
+        await resp.write_eof()
 
         return resp
 
@@ -287,7 +345,9 @@ class QueueStream:
         self.all_clients_connected = asyncio.Event()
         self.index_in_buffer = start_index
         self.signal_next: bool = False
+        self.chunk_size = get_chunksize(output_format)
         self._runner_task: Optional[asyncio.Task] = None
+        self._prev_chunks: Dict[str, bytes] = {}
         if autostart:
             self.mass.create_task(self.start())
 
@@ -305,6 +365,7 @@ class QueueStream:
 
         self._runner_task = None
         self.connected_clients = {}
+        self._prev_chunks = {}
 
         # run garbage collection manually due to the high number of
         # processed bytes blocks
@@ -320,14 +381,18 @@ class QueueStream:
         self.logger.debug("client connected: %s", client_id)
         if len(self.connected_clients) == self.expected_clients:
             self.all_clients_connected.set()
+
+        if client_id in self._prev_chunks:
+            self.logger.warning(
+                "Reconnect of player %s detected, playback may be disturbed", client_id
+            )
+            await callback(self._prev_chunks[client_id])
         try:
             await self.done.wait()
         finally:
             self.connected_clients.pop(client_id, None)
             self.logger.debug("client disconnected: %s", client_id)
-            if len(self.connected_clients) == 0:
-                # no more clients, perform cleanup
-                await self.stop()
+            await self._check_stop()
 
     async def _queue_stream_runner(self) -> None:
         """Distribute audio chunks over connected client queues."""
@@ -350,6 +415,9 @@ class QueueStream:
             str(self.pcm_sample_rate),
             "-i",
             "-",
+            # add metadata
+            "-metadata",
+            "title=Streaming from Music Assistant",
             # output args
             "-f",
             self.output_format.value,
@@ -359,11 +427,10 @@ class QueueStream:
         ]
         # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
         # send the compressed/encoded stream to the client(s).
-        chunk_size = get_chunksize(self.output_format)
         sample_size = int(
             self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels
         )
-        async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc:
+        async with AsyncProcess(ffmpeg_args, True, self.chunk_size) as ffmpeg_proc:
 
             async def writer():
                 """Task that sends the raw pcm audio to the ffmpeg process."""
@@ -389,13 +456,16 @@ class QueueStream:
             self.streaming_started = time()
 
             # Read bytes from final output and send chunk to child callback.
+            chunk_num = 0
             async for chunk in ffmpeg_proc.iterate_chunks():
+                chunk_num += 1
+
                 if len(self.connected_clients) == 0:
                     # no more clients
-                    self.done.set()
-                    self.logger.debug("Abort: all clients diconnected.")
-                    return
+                    if await self._check_stop():
+                        return
                 for client_id in set(self.connected_clients.keys()):
+                    self._prev_chunks[client_id] = chunk
                     try:
                         callback = self.connected_clients[client_id]
                         await callback(chunk)
@@ -406,7 +476,9 @@ class QueueStream:
                     ):
                         self.connected_clients.pop(client_id, None)
 
-                del chunk
+                # back off a bit after first chunk to handle reconnecting clients (e.g. kodi)
+                if chunk_num == 1:
+                    await asyncio.sleep(0.5)
 
             # complete queue streamed
             if self.signal_next:
@@ -659,3 +731,13 @@ class QueueStream:
         del last_fadeout_data
         # END OF QUEUE STREAM
         self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name)
+
+    async def _check_stop(self) -> bool:
+        """Schedule stop of queue stream."""
+        # Stop this queue stream when no clients (re)connected within 5 seconds
+        for _ in range(0, 10):
+            if len(self.connected_clients) > 0:
+                return False
+            await asyncio.sleep(0.5)
+        self.mass.create_task(self.stop())
+        return True
index 152f3038996b00ddb06043f7ab389070d49748dc..e8e737562862f6bf66bc72db65b70af5b312f007 100644 (file)
@@ -631,6 +631,49 @@ async def get_preview_stream(
             yield chunk
 
 
+async def get_silence(
+    duration: int,
+    output_fmt: ContentType = ContentType.WAV,
+    sample_rate: int = 44100,
+    bit_depth: int = 16,
+    channels: int = 2,
+) -> AsyncGenerator[bytes, None]:
+    """Create stream of silence, encoded to format of choice."""
+
+    # wav silence = just zero's
+    if output_fmt == ContentType.WAV:
+        yield create_wave_header(
+            samplerate=sample_rate,
+            channels=2,
+            bitspersample=bit_depth,
+            duration=duration,
+        )
+        for _ in range(0, duration):
+            yield b"\0" * int(sample_rate * (bit_depth / 8) * channels)
+        return
+
+    # use ffmpeg for all other encodings
+    args = [
+        "ffmpeg",
+        "-hide_banner",
+        "-loglevel",
+        "error",
+        "-f",
+        "lavfi",
+        "-i",
+        f"anullsrc=r={sample_rate}:cl={'stereo' if channels == 2 else 'mono'}",
+        "-t",
+        str(duration),
+        "-f",
+        output_fmt.value,
+        "-",
+    ]
+    chunk_size = get_chunksize(output_fmt)
+    async with AsyncProcess(args, chunk_size=chunk_size) as ffmpeg_proc:
+        async for chunk in ffmpeg_proc.iterate_chunks():
+            yield chunk
+
+
 def get_chunksize(content_type: ContentType) -> int:
     """Get a default chunksize for given contenttype."""
     if content_type.is_pcm():
index 7e63343a8a6e95d400a2c4f12b696c26a5bdfbfa..88834c120f3a2f55b8ed05a4e6e03ebac57addd0 100644 (file)
@@ -115,7 +115,7 @@ class PlayerQueue:
         if not self.player.current_url:
             return False
         if stream := self.stream:
-            return stream.url == self.player.current_url
+            return stream.stream_id in self.player.current_url
         return False
 
     @property