if isinstance(index, str):
index = self.index_by_id(queue_id, index)
queue.current_index = index
+ # update current item and elapsed time and signal update
+ # this way the UI knows immediately that a new item is loading
+ queue.current_item = self.get_item(queue_id, index)
+ queue.elapsed_time = seek_position
+ self.signal_update(queue_id)
queue.index_in_buffer = index
queue.flow_mode_stream_log = []
prefer_flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
# send play_media request to player
# NOTE that we debounce this a bit to account for someone hitting the next button
# like a madman. This will prevent the player from being overloaded with requests.
- async def _play_index(index: int) -> None:
+ async def _play_index(index: int, debounce: bool) -> None:
+ if debounce:
+ await asyncio.sleep(0.25)
for attempt in range(5):
try:
queue_item = self.get_item(queue_id, index)
MediaType.RADIO,
MediaType.PLUGIN_SOURCE,
)
+ if debounce:
+ await asyncio.sleep(0.25)
queue.flow_mode = flow_mode
await self.mass.players.play_media(
player_id=queue_id,
# we set a flag to notify the update logic that we're transitioning to a new track
self._transitioning_players.add(queue_id)
- self.mass.call_later(
- 1 if debounce else 0,
+ task = self.mass.create_task(
_play_index,
index,
+ debounce,
task_id=f"play_media_{queue_id}",
+ abort_existing=True,
)
self.signal_update(queue_id)
+ await task
@api_command("player_queues/transfer")
async def transfer_queue(
self._writer_task = self.mass.create_task(self._writer())
# send server(version) info when client connects
- self._send_message(self.mass.get_server_info())
+ await self._send_message(self.mass.get_server_info())
# forward all events to clients
def handle_event(event: MassEvent) -> None:
- self._send_message(event)
+ self._send_message_sync(event)
unsub_callback = self.mass.subscribe(handle_event)
disconnect_warn = f"Received invalid JSON: {msg.data}"
break
- self._handle_command(command_msg)
+ await self._handle_command(command_msg)
except asyncio.CancelledError:
self._logger.debug("Connection closed by client")
return wsock
- def _handle_command(self, msg: CommandMessage) -> None:
+ async def _handle_command(self, msg: CommandMessage) -> None:
"""Handle an incoming command from the client."""
self._logger.debug("Handling command %s", msg.command)
handler = self.mass.command_handlers.get(msg.command)
if handler is None:
- self._send_message(
+ await self._send_message(
ErrorResultMessage(
msg.message_id,
InvalidCommand.error_code,
async for item in iterator:
result.append(item)
if len(result) >= 500:
- self._send_message(
+ await self._send_message(
SuccessResultMessage(msg.message_id, result, partial=True)
)
result = []
elif asyncio.iscoroutine(result):
result = await result
- self._send_message(SuccessResultMessage(msg.message_id, result))
+ await self._send_message(SuccessResultMessage(msg.message_id, result))
except Exception as err:
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.exception("Error handling message: %s", msg)
else:
self._logger.error("Error handling message: %s: %s", msg.command, str(err))
err_msg = str(err) or err.__class__.__name__
- self._send_message(
+ await self._send_message(
ErrorResultMessage(msg.message_id, getattr(err, "error_code", 999), err_msg)
)
self._logger.log(VERBOSE_LOG_LEVEL, "Writing: %s", message)
await self.wsock.send_str(message)
- def _send_message(self, message: MessageType) -> None:
- """Send a message to the client.
+ async def _send_message(self, message: MessageType) -> None:
+ """Send a message to the client (for large response messages).
+ Runs JSON serialization in executor to avoid blocking for large messages.
Closes connection if the client is not reading the messages.
Async friendly.
"""
+ # Run JSON serialization in executor to avoid blocking for large messages
+ loop = asyncio.get_running_loop()
+ _message = await loop.run_in_executor(None, message.to_json)
+
+ try:
+ self._to_write.put_nowait(_message)
+ except asyncio.QueueFull:
+ self._logger.error("Client exceeded max pending messages: %s", MAX_PENDING_MSG)
+
+ self._cancel()
+
+ def _send_message_sync(self, message: MessageType) -> None:
+ """Send a message from a sync context (for small messages like events).
+
+ Serializes inline without executor overhead since events are typically small.
+ """
_message = message.to_json()
try:
self._data_available.notify_all()
self._space_available.notify_all()
- # Run garbage collection in executor to reclaim memory from large buffers
+ # Run garbage collection in background to reclaim memory from large buffers
+ # Don't await it to avoid blocking during task cancellation
loop = asyncio.get_running_loop()
- await loop.run_in_executor(None, gc.collect)
+ loop.run_in_executor(None, gc.collect)
async def set_eof(self) -> None:
"""Signal that no more data will be added to the buffer."""
from functools import wraps
from typing import Final, ParamSpec
+from music_assistant.helpers.util import empty_queue
+
# Type variables for the buffered decorator
_P = ParamSpec("_P")
buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
producer_error: Exception | None = None
threshold_reached = asyncio.Event()
+ cancelled = asyncio.Event()
if buffer_size <= 1:
# No buffering needed, yield directly
nonlocal producer_error
try:
async for chunk in generator:
+ if cancelled.is_set():
+ # Consumer has stopped, exit cleanly
+ break
await buffer.put(chunk)
if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
threshold_reached.set()
- except asyncio.CancelledError:
- # Task was cancelled, clean up the generator
- with contextlib.suppress(RuntimeError, asyncio.CancelledError):
- await generator.aclose()
- raise
except Exception as err:
producer_error = err
- # Consumer probably stopped consuming, close the original generator
- with contextlib.suppress(RuntimeError, asyncio.CancelledError):
- await generator.aclose()
finally:
threshold_reached.set()
+ # Clean up the generator
+ with contextlib.suppress(RuntimeError, asyncio.CancelledError):
+ await generator.aclose()
# Signal end of stream by putting None
with contextlib.suppress(asyncio.QueueFull):
- await buffer.put(None)
+ buffer.put_nowait(None)
# Start the producer task
loop = asyncio.get_running_loop()
yield data
finally:
- # Ensure the producer task is cleaned up
- if not producer_task.done():
- producer_task.cancel()
+ # Signal the producer to stop
+ cancelled.set()
+ # Drain the queue to unblock the producer if it's waiting on put()
+ empty_queue(buffer)
+ # Wait for the producer to finish cleanly
with contextlib.suppress(asyncio.CancelledError, RuntimeError):
await producer_task
# cancel the current audio source task
assert self._audio_source_task # for type checker
self._audio_source_task.cancel()
- with suppress(asyncio.CancelledError):
+ with suppress(asyncio.CancelledError, RuntimeError):
await self._audio_source_task
# set new audio source and restart the stream
self._audio_source = audio_source