From: Marcel van der Veldt Date: Wed, 22 Jun 2022 20:19:12 +0000 (+0200) Subject: Fix playback on Kodi/OSMC (#381) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=b4e35b02a4be5e05dcdc3cb1ae88be457a2b84fa;p=music-assistant-server.git Fix playback on Kodi/OSMC (#381) - support sending icy metadata - fix playback on players that do multiple get requests --- diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index f0767197..d3567dc8 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -13,12 +13,12 @@ from aiohttp import web from music_assistant.helpers.audio import ( check_audio_support, - create_wave_header, crossfade_pcm_parts, fadein_pcm_part, get_chunksize, get_media_stream, get_preview_stream, + get_silence, get_stream_details, strip_silence, ) @@ -81,8 +81,9 @@ class StreamsController: app = web.Application() app.router.add_get("/preview", self.serve_preview) + app.router.add_get("/silence.{fmt}", self.serve_silence) app.router.add_get("/{queue_id}/{control}", self.serve_control) - app.router.add_get("/{stream_id}.{format}", self.serve_queue_stream) + app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream) runner = web.AppRunner(app, access_log=None) await runner.setup() @@ -127,9 +128,8 @@ class StreamsController: await resp.prepare(request) if request.method == "GET": # service 60 seconds of silence while player is processing request - await resp.write(create_wave_header(duration=60)) - for _ in range(0, 60): - await resp.write(b"\0" * 1764000) + async for chunk in get_silence(60, ContentType.WAV): + await resp.write(chunk) return resp async def serve_preview(self, request: web.Request): @@ -144,8 +144,30 @@ class StreamsController: await resp.write(chunk) return resp + @staticmethod + async def serve_silence(request: web.Request): + """Serve some nice silence.""" + duration = int(request.query.get("duration", 60)) + fmt = ContentType.try_parse(request.match_info["fmt"]) + + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": f"audio/{fmt.value}"} + ) + await resp.prepare(request) + if request.method == "GET": + async for chunk in get_silence(duration, fmt): + await resp.write(chunk) + return resp + async def serve_queue_stream(self, request: web.Request): """Serve queue audio stream to a single player.""" + self.logger.debug( + "Got %s request to %s from %s\nheaders: %s\n", + request.method, + request.path, + request.remote, + request.headers, + ) stream_id = request.match_info["stream_id"] queue_stream = self.queue_streams.get(stream_id) @@ -157,15 +179,51 @@ class StreamsController: headers = { "Content-Type": f"audio/{queue_stream.output_format.value}", "transferMode.dlna.org": "Streaming", - "Connection": "Close", + "icy-name": "Streaming from Music Assistant", + "icy-pub": "0", + "Cache-Control": "no-cache", + "icy-metaint": str(queue_stream.chunk_size), "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000", } + resp = web.StreamResponse(headers=headers) await resp.prepare(request) - if request.method == "GET": - client_id = request.remote - await queue_stream.subscribe(client_id, resp.write) + if request.method != "GET": + # do not start stream on HEAD request + return resp + + client_id = request.remote + enable_icy = request.headers.get("Icy-MetaData", "") == "1" + + # regular streaming - each chunk is sent to the callback here + # this chunk is already encoded to the requested audio format of choice. + # optional ICY metadata can be sent to the client if it supports that + async def audio_callback(chunk: bytes) -> None: + """Call when a new audio chunk arrives.""" + # write audio + await resp.write(chunk) + + # ICY metadata support + if not enable_icy: + return + + # if icy metadata is enabled, send the icy metadata after the chunk + item_in_buf = queue_stream.queue.get_item(queue_stream.index_in_buffer) + if item_in_buf and item_in_buf.name: + title = item_in_buf.name + else: + title = "Music Assistant" + image = item_in_buf.image or "" + metadata = f"StreamTitle='{title}';StreamUrl='&picture={image}';".encode() + while len(metadata) % 16 != 0: + metadata += b"\x00" + length = len(metadata) + length_b = chr(int(length / 16)).encode() + await resp.write(length_b + metadata) + + await queue_stream.subscribe(client_id, audio_callback) + await resp.write_eof() return resp @@ -287,7 +345,9 @@ class QueueStream: self.all_clients_connected = asyncio.Event() self.index_in_buffer = start_index self.signal_next: bool = False + self.chunk_size = get_chunksize(output_format) self._runner_task: Optional[asyncio.Task] = None + self._prev_chunks: Dict[str, bytes] = {} if autostart: self.mass.create_task(self.start()) @@ -305,6 +365,7 @@ class QueueStream: self._runner_task = None self.connected_clients = {} + self._prev_chunks = {} # run garbage collection manually due to the high number of # processed bytes blocks @@ -320,14 +381,18 @@ class QueueStream: self.logger.debug("client connected: %s", client_id) if len(self.connected_clients) == self.expected_clients: self.all_clients_connected.set() + + if client_id in self._prev_chunks: + self.logger.warning( + "Reconnect of player %s detected, playback may be disturbed", client_id + ) + await callback(self._prev_chunks[client_id]) try: await self.done.wait() finally: self.connected_clients.pop(client_id, None) self.logger.debug("client disconnected: %s", client_id) - if len(self.connected_clients) == 0: - # no more clients, perform cleanup - await self.stop() + await self._check_stop() async def _queue_stream_runner(self) -> None: """Distribute audio chunks over connected client queues.""" @@ -350,6 +415,9 @@ class QueueStream: str(self.pcm_sample_rate), "-i", "-", + # add metadata + "-metadata", + "title=Streaming from Music Assistant", # output args "-f", self.output_format.value, @@ -359,11 +427,10 @@ class QueueStream: ] # get the raw pcm bytes from the queue stream and on the fly encode to wanted format # send the compressed/encoded stream to the client(s). - chunk_size = get_chunksize(self.output_format) sample_size = int( self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels ) - async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc: + async with AsyncProcess(ffmpeg_args, True, self.chunk_size) as ffmpeg_proc: async def writer(): """Task that sends the raw pcm audio to the ffmpeg process.""" @@ -389,13 +456,16 @@ class QueueStream: self.streaming_started = time() # Read bytes from final output and send chunk to child callback. + chunk_num = 0 async for chunk in ffmpeg_proc.iterate_chunks(): + chunk_num += 1 + if len(self.connected_clients) == 0: # no more clients - self.done.set() - self.logger.debug("Abort: all clients diconnected.") - return + if await self._check_stop(): + return for client_id in set(self.connected_clients.keys()): + self._prev_chunks[client_id] = chunk try: callback = self.connected_clients[client_id] await callback(chunk) @@ -406,7 +476,9 @@ class QueueStream: ): self.connected_clients.pop(client_id, None) - del chunk + # back off a bit after first chunk to handle reconnecting clients (e.g. kodi) + if chunk_num == 1: + await asyncio.sleep(0.5) # complete queue streamed if self.signal_next: @@ -659,3 +731,13 @@ class QueueStream: del last_fadeout_data # END OF QUEUE STREAM self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name) + + async def _check_stop(self) -> bool: + """Schedule stop of queue stream.""" + # Stop this queue stream when no clients (re)connected within 5 seconds + for _ in range(0, 10): + if len(self.connected_clients) > 0: + return False + await asyncio.sleep(0.5) + self.mass.create_task(self.stop()) + return True diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 152f3038..e8e73756 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -631,6 +631,49 @@ async def get_preview_stream( yield chunk +async def get_silence( + duration: int, + output_fmt: ContentType = ContentType.WAV, + sample_rate: int = 44100, + bit_depth: int = 16, + channels: int = 2, +) -> AsyncGenerator[bytes, None]: + """Create stream of silence, encoded to format of choice.""" + + # wav silence = just zero's + if output_fmt == ContentType.WAV: + yield create_wave_header( + samplerate=sample_rate, + channels=2, + bitspersample=bit_depth, + duration=duration, + ) + for _ in range(0, duration): + yield b"\0" * int(sample_rate * (bit_depth / 8) * channels) + return + + # use ffmpeg for all other encodings + args = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-f", + "lavfi", + "-i", + f"anullsrc=r={sample_rate}:cl={'stereo' if channels == 2 else 'mono'}", + "-t", + str(duration), + "-f", + output_fmt.value, + "-", + ] + chunk_size = get_chunksize(output_fmt) + async with AsyncProcess(args, chunk_size=chunk_size) as ffmpeg_proc: + async for chunk in ffmpeg_proc.iterate_chunks(): + yield chunk + + def get_chunksize(content_type: ContentType) -> int: """Get a default chunksize for given contenttype.""" if content_type.is_pcm(): diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 7e63343a..88834c12 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -115,7 +115,7 @@ class PlayerQueue: if not self.player.current_url: return False if stream := self.stream: - return stream.url == self.player.current_url + return stream.stream_id in self.player.current_url return False @property