flow_mode: bool = False
resume_pos: int = 0
flow_mode_stream_log: list[PlayLogEntry] = field(default_factory=list)
+ next_track_enqueued: str | None = None
@property
def corrected_elapsed_time(self) -> float:
"""Return the corrected/realtime elapsed time."""
return self.elapsed_time + (time.time() - self.elapsed_time_last_updated)
+ def __post_serialize__(self, d: dict[Any, Any]) -> dict[Any, Any]:
+ """Execute action(s) on serialization."""
+ d.pop("flow_mode_stream_log", None)
+ d.pop("enqueued_media_items", None)
+ d.pop("next_track_enqueued", None)
+ return d
+
def to_cache(self) -> dict[str, Any]:
"""Return the dict that is suitable for storing into the cache db."""
d = self.to_dict()
d.pop("next_item", None)
d.pop("index_in_buffer", None)
d.pop("flow_mode", None)
- d.pop("flow_mode_stream_log", None)
- d.pop("enqueued_media_items", None)
return d
@classmethod
d.pop("next_item", None)
d.pop("index_in_buffer", None)
d.pop("flow_mode", None)
- d.pop("flow_mode_stream_log", None)
d.pop("enqueued_media_items", None)
+ d.pop("next_track_enqueued", None)
+ d.pop("flow_mode_stream_log", None)
return cls.from_dict(d)
import asyncio
import random
import time
-from contextlib import suppress
from typing import TYPE_CHECKING, Any, TypedDict
from music_assistant.common.helpers.util import get_changed_keys
return # no change
queue.repeat_mode = repeat_mode
self.signal_update(queue_id)
- # ensure that we trigger enqueue next if repeat mode changed (if needed/supported)
- task_id = f"enqueue_next_{queue_id}"
- self.mass.call_later(5, self._enqueue_next, queue, queue.current_index, task_id=task_id)
@api_command("player_queues/play_media")
async def play_media(
queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
next_index = self._get_next_index(queue_id, index, allow_repeat=False)
queue.current_item = queue_item
+ queue.next_track_enqueued = None
self.signal_update(queue_id)
# work out if we are playing an album and if we should prefer album loudness
# and has an item loaded so we are able to resume it
queue.state = player.state or PlayerState.IDLE
queue.current_item = self.get_item(queue_id, queue.current_index)
- queue.next_item = self._get_next_item(queue_id)
+ queue.next_item = (
+ self.get_item(queue_id, queue.next_track_enqueued)
+ if queue.next_track_enqueued
+ else self._get_next_item(queue_id, queue.current_index)
+ )
# correct elapsed time when seeking
if (
):
queue.elapsed_time += queue.current_item.streamdetails.seek_position
+ # enqueue next track if needed
+ if (
+ queue.state == PlayerState.PLAYING
+ and queue.next_item is not None
+ and not queue.next_track_enqueued
+ and queue.corrected_elapsed_time > 2
+ ):
+ self._check_enqueue_next(queue)
+
# basic throttle: do not send state changed events if queue did not actually change
prev_state = self._prev_states.get(
queue_id,
object_id=queue_item.media_item.uri,
data=round(seconds_streamed, 2),
)
+
if end_of_queue_reached:
# end of queue reached, clear items
self.mass.call_later(
5, self._check_clear_queue, queue, task_id=f"clear_queue_{queue_id}"
)
+ # clear 'next track enqueued' flag if new track is loaded
+ if prev_state["current_index"] != new_state["current_index"]:
+ queue.next_track_enqueued = None
+
# watch dynamic radio items refill if needed
elif "current_index" in changed_keys:
if (
if queue.flow_mode:
return # nothing to do when flow mode is active
self.signal_update(queue_id)
- # enqueue the next track as soon as the player reports
- # it has started buffering the given queue item
- if not queue.flow_mode:
- task_id = f"enqueue_next_{queue_id}"
- self.mass.call_later(5, self._enqueue_next, queue, item_id, task_id=task_id)
# Main queue manipulation methods
self._queue_items[queue_id] = queue_items
self._queues[queue_id].items = len(self._queue_items[queue_id])
self.signal_update(queue_id, True)
+ self._queues[queue_id].next_track_enqueued = None
# Helper methods
insert_at_index=len(self._queue_items[queue_id]) + 1,
)
- async def _enqueue_next(self, queue: PlayerQueue, current_index: int | str) -> None:
- """Enqueue the next item in the queue."""
+ def _check_enqueue_next(self, queue: PlayerQueue) -> None:
+ """Enqueue the next item in the queue (if needed)."""
if queue.flow_mode:
return
- if isinstance(current_index, str):
- current_index = self.index_by_id(queue.queue_id, current_index)
- with suppress(QueueEmpty):
- next_item = await self.load_next_item(queue.queue_id, current_index)
+ if queue.next_item is None:
+ return
+ if queue.next_track_enqueued == queue.next_item.queue_item_id:
+ return
+
+ async def _enqueue_next():
+ next_item = await self.load_next_item(queue.queue_id, queue.current_index)
+ queue.next_track_enqueued = next_item.queue_item_id
await self.mass.players.enqueue_next_media(
player_id=queue.queue_id,
- media=self.player_media_from_queue_item(next_item, queue.flow_mode),
+ media=self.player_media_from_queue_item(next_item, False),
)
+ self.mass.create_task(_enqueue_next())
+
async def _get_radio_tracks(
self, queue_id: str, is_initial_radio_mode: bool = False
) -> list[Track]: