From: Marcel van der Veldt Date: Fri, 29 Jul 2022 10:52:39 +0000 (+0200) Subject: Fix playback on reconnecting players X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=72aafa4a582def14cc5a6fc5e0742be0b82bf656;p=music-assistant-server.git Fix playback on reconnecting players --- diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 3bf81372..3ed542fc 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -183,14 +183,17 @@ class StreamsController: # handle a second connection for the same player # this means either that the player itself want to skip to the next track - # or a misbehaving client which reconnects multiple times (e.g. Kodi) + # or a misbehaving client which reconnects multiple times (e.g. Kodi, Vlc) if client_id in queue_stream.connected_clients: self.logger.warning( "Simultanuous connections detected from %s, playback may be disturbed!", client_id, ) client_id += uuid4().hex - elif queue_stream.all_clients_connected.is_set(): + elif ( + queue_stream.all_clients_connected.is_set() + and queue_stream.total_seconds_streamed + ): self.logger.debug( "Got stream request for running stream: %s, assuming next", stream_id ) @@ -364,7 +367,6 @@ class QueueStream: self.index_in_buffer = start_index self.signal_next: Optional[int] = None self._runner_task: Optional[asyncio.Task] = None - self._prev_chunk: bytes = b"" if queue.settings.metadata_mode == MetadataMode.LEGACY: # use the legacy/recommended metaint size of 8192 bytes self.output_chunksize = ICY_CHUNKSIZE @@ -400,12 +402,7 @@ class QueueStream: async def subscribe(self, client_id: str, callback: CoroutineType[bytes]) -> None: """Subscribe callback and wait for completion.""" assert not self.done.is_set(), "Stream task is already finished" - if client_id in self.connected_clients: - self.logger.warning( - "Simultanuous connections detected from %s, playback may be disturbed", - client_id, - ) - client_id += uuid4().hex + # assert client_id not in self.connected_clients, "Client already connected" self.connected_clients[client_id] = callback self.logger.debug("client connected: %s", client_id) @@ -414,8 +411,8 @@ class QueueStream: try: await self.done.wait() finally: - self.connected_clients.pop(client_id, None) self.logger.debug("client disconnected: %s", client_id) + self.connected_clients.pop(client_id, None) await self._check_stop() async def _queue_stream_runner(self) -> None: @@ -481,6 +478,9 @@ class QueueStream: self.done.set() return self.logger.debug("%s clients connected", len(self.connected_clients)) + # TEMP fix - to remove in stream refactor + # some clients connect twice (browser, vlc, kodi) and we are not prepared for that + await asyncio.sleep(0.2) self.streaming_started = time() # Read bytes from final output and send chunk to child callback. @@ -493,7 +493,6 @@ class QueueStream: if await self._check_stop(): return for client_id in set(self.connected_clients.keys()): - self._prev_chunk = chunk try: callback = self.connected_clients[client_id] await callback(chunk) @@ -517,7 +516,7 @@ class QueueStream: self.total_seconds_streamed - self.queue.player.elapsed_time or 0 ) if player_buffered > 30: - await asyncio.sleep(1) + await asyncio.sleep(0.5) # all queue data has been streamed. Either because the queue is exhausted # or we need to restart the stream due to decoder/sample rate mismatch @@ -643,8 +642,6 @@ class QueueStream: # send signal that we've loaded a new track into the buffer self.index_in_buffer = queue_index self.queue.signal_update() - # precache the streamdetails for the next track - self.mass.create_task(self._precache_next_streamdetails()) buffer = b"" bytes_written = 0 @@ -773,16 +770,6 @@ class QueueStream: # END OF QUEUE STREAM self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name) - async def _precache_next_streamdetails(self) -> None: - """Prefetch the streamdetails for the next track.""" - next_index = self.queue.get_next_index(self.index_in_buffer) - if next_index <= self.index_in_buffer: - return - queue_track = self.queue.get_item(next_index) - if not queue_track: - return - await get_stream_details(self.mass, queue_track, self.queue.queue_id) - async def _check_stop(self) -> bool: """Schedule stop of queue stream.""" # Stop this queue stream when no clients (re)connected within 5 seconds