From: Marcel van der Veldt Date: Sat, 25 Oct 2025 19:24:14 +0000 (+0200) Subject: Various small speed and safety fixes X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=fc3f3da524b94d44786cd752c54977eafcc3eaa4;p=music-assistant-server.git Various small speed and safety fixes --- diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index 21efbf00..5631090a 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -805,6 +805,11 @@ class PlayerQueuesController(CoreController): if isinstance(index, str): index = self.index_by_id(queue_id, index) queue.current_index = index + # update current item and elapsed time and signal update + # this way the UI knows immediately that a new item is loading + queue.current_item = self.get_item(queue_id, index) + queue.elapsed_time = seek_position + self.signal_update(queue_id) queue.index_in_buffer = index queue.flow_mode_stream_log = [] prefer_flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE) @@ -823,7 +828,9 @@ class PlayerQueuesController(CoreController): # send play_media request to player # NOTE that we debounce this a bit to account for someone hitting the next button # like a madman. This will prevent the player from being overloaded with requests. - async def _play_index(index: int) -> None: + async def _play_index(index: int, debounce: bool) -> None: + if debounce: + await asyncio.sleep(0.25) for attempt in range(5): try: queue_item = self.get_item(queue_id, index) @@ -856,6 +863,8 @@ class PlayerQueuesController(CoreController): MediaType.RADIO, MediaType.PLUGIN_SOURCE, ) + if debounce: + await asyncio.sleep(0.25) queue.flow_mode = flow_mode await self.mass.players.play_media( player_id=queue_id, @@ -866,13 +875,15 @@ class PlayerQueuesController(CoreController): # we set a flag to notify the update logic that we're transitioning to a new track self._transitioning_players.add(queue_id) - self.mass.call_later( - 1 if debounce else 0, + task = self.mass.create_task( _play_index, index, + debounce, task_id=f"play_media_{queue_id}", + abort_existing=True, ) self.signal_update(queue_id) + await task @api_command("player_queues/transfer") async def transfer_queue( diff --git a/music_assistant/controllers/webserver.py b/music_assistant/controllers/webserver.py index e551a4bd..04e2f7c8 100644 --- a/music_assistant/controllers/webserver.py +++ b/music_assistant/controllers/webserver.py @@ -303,11 +303,11 @@ class WebsocketClientHandler: self._writer_task = self.mass.create_task(self._writer()) # send server(version) info when client connects - self._send_message(self.mass.get_server_info()) + await self._send_message(self.mass.get_server_info()) # forward all events to clients def handle_event(event: MassEvent) -> None: - self._send_message(event) + self._send_message_sync(event) unsub_callback = self.mass.subscribe(handle_event) @@ -331,7 +331,7 @@ class WebsocketClientHandler: disconnect_warn = f"Received invalid JSON: {msg.data}" break - self._handle_command(command_msg) + await self._handle_command(command_msg) except asyncio.CancelledError: self._logger.debug("Connection closed by client") @@ -360,7 +360,7 @@ class WebsocketClientHandler: return wsock - def _handle_command(self, msg: CommandMessage) -> None: + async def _handle_command(self, msg: CommandMessage) -> None: """Handle an incoming command from the client.""" self._logger.debug("Handling command %s", msg.command) @@ -368,7 +368,7 @@ class WebsocketClientHandler: handler = self.mass.command_handlers.get(msg.command) if handler is None: - self._send_message( + await self._send_message( ErrorResultMessage( msg.message_id, InvalidCommand.error_code, @@ -392,20 +392,20 @@ class WebsocketClientHandler: async for item in iterator: result.append(item) if len(result) >= 500: - self._send_message( + await self._send_message( SuccessResultMessage(msg.message_id, result, partial=True) ) result = [] elif asyncio.iscoroutine(result): result = await result - self._send_message(SuccessResultMessage(msg.message_id, result)) + await self._send_message(SuccessResultMessage(msg.message_id, result)) except Exception as err: if self._logger.isEnabledFor(logging.DEBUG): self._logger.exception("Error handling message: %s", msg) else: self._logger.error("Error handling message: %s: %s", msg.command, str(err)) err_msg = str(err) or err.__class__.__name__ - self._send_message( + await self._send_message( ErrorResultMessage(msg.message_id, getattr(err, "error_code", 999), err_msg) ) @@ -424,13 +424,30 @@ class WebsocketClientHandler: self._logger.log(VERBOSE_LOG_LEVEL, "Writing: %s", message) await self.wsock.send_str(message) - def _send_message(self, message: MessageType) -> None: - """Send a message to the client. + async def _send_message(self, message: MessageType) -> None: + """Send a message to the client (for large response messages). + Runs JSON serialization in executor to avoid blocking for large messages. Closes connection if the client is not reading the messages. Async friendly. """ + # Run JSON serialization in executor to avoid blocking for large messages + loop = asyncio.get_running_loop() + _message = await loop.run_in_executor(None, message.to_json) + + try: + self._to_write.put_nowait(_message) + except asyncio.QueueFull: + self._logger.error("Client exceeded max pending messages: %s", MAX_PENDING_MSG) + + self._cancel() + + def _send_message_sync(self, message: MessageType) -> None: + """Send a message from a sync context (for small messages like events). + + Serializes inline without executor overhead since events are typically small. + """ _message = message.to_json() try: diff --git a/music_assistant/helpers/audio_buffer.py b/music_assistant/helpers/audio_buffer.py index 24ff7d01..735a828b 100644 --- a/music_assistant/helpers/audio_buffer.py +++ b/music_assistant/helpers/audio_buffer.py @@ -253,9 +253,10 @@ class AudioBuffer: self._data_available.notify_all() self._space_available.notify_all() - # Run garbage collection in executor to reclaim memory from large buffers + # Run garbage collection in background to reclaim memory from large buffers + # Don't await it to avoid blocking during task cancellation loop = asyncio.get_running_loop() - await loop.run_in_executor(None, gc.collect) + loop.run_in_executor(None, gc.collect) async def set_eof(self) -> None: """Signal that no more data will be added to the buffer.""" diff --git a/music_assistant/helpers/buffered_generator.py b/music_assistant/helpers/buffered_generator.py index 53b35ad3..11b6c18b 100644 --- a/music_assistant/helpers/buffered_generator.py +++ b/music_assistant/helpers/buffered_generator.py @@ -8,6 +8,8 @@ from collections.abc import AsyncGenerator, Callable from functools import wraps from typing import Final, ParamSpec +from music_assistant.helpers.util import empty_queue + # Type variables for the buffered decorator _P = ParamSpec("_P") @@ -39,6 +41,7 @@ async def buffered( buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size) producer_error: Exception | None = None threshold_reached = asyncio.Event() + cancelled = asyncio.Event() if buffer_size <= 1: # No buffering needed, yield directly @@ -51,24 +54,22 @@ async def buffered( nonlocal producer_error try: async for chunk in generator: + if cancelled.is_set(): + # Consumer has stopped, exit cleanly + break await buffer.put(chunk) if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield: threshold_reached.set() - except asyncio.CancelledError: - # Task was cancelled, clean up the generator - with contextlib.suppress(RuntimeError, asyncio.CancelledError): - await generator.aclose() - raise except Exception as err: producer_error = err - # Consumer probably stopped consuming, close the original generator - with contextlib.suppress(RuntimeError, asyncio.CancelledError): - await generator.aclose() finally: threshold_reached.set() + # Clean up the generator + with contextlib.suppress(RuntimeError, asyncio.CancelledError): + await generator.aclose() # Signal end of stream by putting None with contextlib.suppress(asyncio.QueueFull): - await buffer.put(None) + buffer.put_nowait(None) # Start the producer task loop = asyncio.get_running_loop() @@ -100,9 +101,11 @@ async def buffered( yield data finally: - # Ensure the producer task is cleaned up - if not producer_task.done(): - producer_task.cancel() + # Signal the producer to stop + cancelled.set() + # Drain the queue to unblock the producer if it's waiting on put() + empty_queue(buffer) + # Wait for the producer to finish cleanly with contextlib.suppress(asyncio.CancelledError, RuntimeError): await producer_task diff --git a/music_assistant/providers/airplay/raop.py b/music_assistant/providers/airplay/raop.py index 7c26662b..dd72d012 100644 --- a/music_assistant/providers/airplay/raop.py +++ b/music_assistant/providers/airplay/raop.py @@ -129,7 +129,7 @@ class RaopStreamSession: # cancel the current audio source task assert self._audio_source_task # for type checker self._audio_source_task.cancel() - with suppress(asyncio.CancelledError): + with suppress(asyncio.CancelledError, RuntimeError): await self._audio_source_task # set new audio source and restart the stream self._audio_source = audio_source