Various small speed and safety fixes
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 25 Oct 2025 19:24:14 +0000 (21:24 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 25 Oct 2025 19:24:14 +0000 (21:24 +0200)
music_assistant/controllers/player_queues.py
music_assistant/controllers/webserver.py
music_assistant/helpers/audio_buffer.py
music_assistant/helpers/buffered_generator.py
music_assistant/providers/airplay/raop.py

index 21efbf00f7c06b66e75eb50d609040f7385f774a..5631090aa61a030669baaaadea7939c85270878d 100644 (file)
@@ -805,6 +805,11 @@ class PlayerQueuesController(CoreController):
         if isinstance(index, str):
             index = self.index_by_id(queue_id, index)
         queue.current_index = index
+        # update current item and elapsed time and signal update
+        # this way the UI knows immediately that a new item is loading
+        queue.current_item = self.get_item(queue_id, index)
+        queue.elapsed_time = seek_position
+        self.signal_update(queue_id)
         queue.index_in_buffer = index
         queue.flow_mode_stream_log = []
         prefer_flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
@@ -823,7 +828,9 @@ class PlayerQueuesController(CoreController):
         # send play_media request to player
         # NOTE that we debounce this a bit to account for someone hitting the next button
         # like a madman. This will prevent the player from being overloaded with requests.
-        async def _play_index(index: int) -> None:
+        async def _play_index(index: int, debounce: bool) -> None:
+            if debounce:
+                await asyncio.sleep(0.25)
             for attempt in range(5):
                 try:
                     queue_item = self.get_item(queue_id, index)
@@ -856,6 +863,8 @@ class PlayerQueuesController(CoreController):
                 MediaType.RADIO,
                 MediaType.PLUGIN_SOURCE,
             )
+            if debounce:
+                await asyncio.sleep(0.25)
             queue.flow_mode = flow_mode
             await self.mass.players.play_media(
                 player_id=queue_id,
@@ -866,13 +875,15 @@ class PlayerQueuesController(CoreController):
 
         # we set a flag to notify the update logic that we're transitioning to a new track
         self._transitioning_players.add(queue_id)
-        self.mass.call_later(
-            1 if debounce else 0,
+        task = self.mass.create_task(
             _play_index,
             index,
+            debounce,
             task_id=f"play_media_{queue_id}",
+            abort_existing=True,
         )
         self.signal_update(queue_id)
+        await task
 
     @api_command("player_queues/transfer")
     async def transfer_queue(
index e551a4bdda9ff2d1c4dea1c6c98c910b0c5aaf7c..04e2f7c81bbd1c271ebb52ce85b28a43e4de9153 100644 (file)
@@ -303,11 +303,11 @@ class WebsocketClientHandler:
         self._writer_task = self.mass.create_task(self._writer())
 
         # send server(version) info when client connects
-        self._send_message(self.mass.get_server_info())
+        await self._send_message(self.mass.get_server_info())
 
         # forward all events to clients
         def handle_event(event: MassEvent) -> None:
-            self._send_message(event)
+            self._send_message_sync(event)
 
         unsub_callback = self.mass.subscribe(handle_event)
 
@@ -331,7 +331,7 @@ class WebsocketClientHandler:
                     disconnect_warn = f"Received invalid JSON: {msg.data}"
                     break
 
-                self._handle_command(command_msg)
+                await self._handle_command(command_msg)
 
         except asyncio.CancelledError:
             self._logger.debug("Connection closed by client")
@@ -360,7 +360,7 @@ class WebsocketClientHandler:
 
         return wsock
 
-    def _handle_command(self, msg: CommandMessage) -> None:
+    async def _handle_command(self, msg: CommandMessage) -> None:
         """Handle an incoming command from the client."""
         self._logger.debug("Handling command %s", msg.command)
 
@@ -368,7 +368,7 @@ class WebsocketClientHandler:
         handler = self.mass.command_handlers.get(msg.command)
 
         if handler is None:
-            self._send_message(
+            await self._send_message(
                 ErrorResultMessage(
                     msg.message_id,
                     InvalidCommand.error_code,
@@ -392,20 +392,20 @@ class WebsocketClientHandler:
                 async for item in iterator:
                     result.append(item)
                     if len(result) >= 500:
-                        self._send_message(
+                        await self._send_message(
                             SuccessResultMessage(msg.message_id, result, partial=True)
                         )
                         result = []
             elif asyncio.iscoroutine(result):
                 result = await result
-            self._send_message(SuccessResultMessage(msg.message_id, result))
+            await self._send_message(SuccessResultMessage(msg.message_id, result))
         except Exception as err:
             if self._logger.isEnabledFor(logging.DEBUG):
                 self._logger.exception("Error handling message: %s", msg)
             else:
                 self._logger.error("Error handling message: %s: %s", msg.command, str(err))
             err_msg = str(err) or err.__class__.__name__
-            self._send_message(
+            await self._send_message(
                 ErrorResultMessage(msg.message_id, getattr(err, "error_code", 999), err_msg)
             )
 
@@ -424,13 +424,30 @@ class WebsocketClientHandler:
                 self._logger.log(VERBOSE_LOG_LEVEL, "Writing: %s", message)
                 await self.wsock.send_str(message)
 
-    def _send_message(self, message: MessageType) -> None:
-        """Send a message to the client.
+    async def _send_message(self, message: MessageType) -> None:
+        """Send a message to the client (for large response messages).
 
+        Runs JSON serialization in executor to avoid blocking for large messages.
         Closes connection if the client is not reading the messages.
 
         Async friendly.
         """
+        # Run JSON serialization in executor to avoid blocking for large messages
+        loop = asyncio.get_running_loop()
+        _message = await loop.run_in_executor(None, message.to_json)
+
+        try:
+            self._to_write.put_nowait(_message)
+        except asyncio.QueueFull:
+            self._logger.error("Client exceeded max pending messages: %s", MAX_PENDING_MSG)
+
+            self._cancel()
+
+    def _send_message_sync(self, message: MessageType) -> None:
+        """Send a message from a sync context (for small messages like events).
+
+        Serializes inline without executor overhead since events are typically small.
+        """
         _message = message.to_json()
 
         try:
index 24ff7d015b94b201c9f7aeedea2eee4e905fbd9c..735a828be2b88919f599aad8367db0a1c2099e0f 100644 (file)
@@ -253,9 +253,10 @@ class AudioBuffer:
             self._data_available.notify_all()
             self._space_available.notify_all()
 
-        # Run garbage collection in executor to reclaim memory from large buffers
+        # Run garbage collection in background to reclaim memory from large buffers
+        # Don't await it to avoid blocking during task cancellation
         loop = asyncio.get_running_loop()
-        await loop.run_in_executor(None, gc.collect)
+        loop.run_in_executor(None, gc.collect)
 
     async def set_eof(self) -> None:
         """Signal that no more data will be added to the buffer."""
index 53b35ad3d5bf5d9f9a4731dbd786e869a8774f0d..11b6c18b259c4f0e4284a7a008ff791006964835 100644 (file)
@@ -8,6 +8,8 @@ from collections.abc import AsyncGenerator, Callable
 from functools import wraps
 from typing import Final, ParamSpec
 
+from music_assistant.helpers.util import empty_queue
+
 # Type variables for the buffered decorator
 _P = ParamSpec("_P")
 
@@ -39,6 +41,7 @@ async def buffered(
     buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
     producer_error: Exception | None = None
     threshold_reached = asyncio.Event()
+    cancelled = asyncio.Event()
 
     if buffer_size <= 1:
         # No buffering needed, yield directly
@@ -51,24 +54,22 @@ async def buffered(
         nonlocal producer_error
         try:
             async for chunk in generator:
+                if cancelled.is_set():
+                    # Consumer has stopped, exit cleanly
+                    break
                 await buffer.put(chunk)
                 if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
                     threshold_reached.set()
-        except asyncio.CancelledError:
-            # Task was cancelled, clean up the generator
-            with contextlib.suppress(RuntimeError, asyncio.CancelledError):
-                await generator.aclose()
-            raise
         except Exception as err:
             producer_error = err
-            # Consumer probably stopped consuming, close the original generator
-            with contextlib.suppress(RuntimeError, asyncio.CancelledError):
-                await generator.aclose()
         finally:
             threshold_reached.set()
+            # Clean up the generator
+            with contextlib.suppress(RuntimeError, asyncio.CancelledError):
+                await generator.aclose()
             # Signal end of stream by putting None
             with contextlib.suppress(asyncio.QueueFull):
-                await buffer.put(None)
+                buffer.put_nowait(None)
 
     # Start the producer task
     loop = asyncio.get_running_loop()
@@ -100,9 +101,11 @@ async def buffered(
             yield data
 
     finally:
-        # Ensure the producer task is cleaned up
-        if not producer_task.done():
-            producer_task.cancel()
+        # Signal the producer to stop
+        cancelled.set()
+        # Drain the queue to unblock the producer if it's waiting on put()
+        empty_queue(buffer)
+        # Wait for the producer to finish cleanly
         with contextlib.suppress(asyncio.CancelledError, RuntimeError):
             await producer_task
 
index 7c26662b7f86123a68c8f832b38f56252d7a9c34..dd72d012e7e21da47dd23243777cfe273656c327 100644 (file)
@@ -129,7 +129,7 @@ class RaopStreamSession:
         # cancel the current audio source task
         assert self._audio_source_task  # for type checker
         self._audio_source_task.cancel()
-        with suppress(asyncio.CancelledError):
+        with suppress(asyncio.CancelledError, RuntimeError):
             await self._audio_source_task
         # set new audio source and restart the stream
         self._audio_source = audio_source