From: Marcel van der Veldt Date: Sat, 18 Jun 2022 11:50:50 +0000 (+0200) Subject: Fix queue corruption issue (#374) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=f827d152f78b2c8eef5804d3a17860c2603260ef;p=music-assistant-server.git Fix queue corruption issue (#374) * Fix queue get corrupt when adding single item * fix race condition while playing TTS if player is still off --- diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index aeb0a872..17156309 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -308,23 +308,23 @@ class PlayerQueue: # start queue with alert sound(s) self._items = queue_items - self._settings.repeat_mode = RepeatMode.OFF - self._settings.shuffle_enabled = False await self.queue_stream_start(0, 0, False, is_alert=True) # wait for the player to finish playing - alert_done = asyncio.Event() + play_started = asyncio.Event() + play_stopped = asyncio.Event() def handle_event(evt: MassEvent): - if self.player.state != PlayerState.PLAYING: - alert_done.set() + if self.player.state == PlayerState.PLAYING: + play_started.set() + elif play_started.is_set(): + play_stopped.set() unsub = self.mass.subscribe( handle_event, EventType.QUEUE_UPDATED, self.queue_id ) try: - await asyncio.wait_for(self.stream.done.wait(), 30) - await asyncio.wait_for(alert_done.wait(), 30) + await asyncio.wait_for(play_stopped.wait(), 30) except asyncio.TimeoutError: self.logger.warning("Timeout while playing alert") finally: @@ -430,7 +430,7 @@ class PlayerQueue: # restore queue self._settings.repeat_mode = self._snapshot.repeat self._settings.shuffle_enabled = self._snapshot.shuffle - await self.update(self._snapshot.items) + await self._update_items(self._snapshot.items) self._current_index = self._snapshot.index self._current_item_elapsed_time = self._snapshot.position if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED): @@ -481,7 +481,7 @@ class PlayerQueue: return # move the item in the list items.insert(new_index, items.pop(item_index)) - await self.update(items) + await self._update_items(items) async def delete_item(self, queue_item_id: str) -> None: """Delete item (by id or index) from the queue.""" @@ -548,25 +548,23 @@ class PlayerQueue: for index, item in enumerate(queue_items): item.sort_index = len(self.items) + index if self.settings.shuffle_enabled: + # if shuffle is enabled we shuffle the remaining tracks and the new ones played_items = self.items[:cur_index] next_items = self.items[cur_index + 1 :] + queue_items next_items = random.sample(next_items, len(next_items)) - items = played_items + [self.current_item] + next_items - await self.update(items) - return - self._items = self._items + queue_items - self.signal_update(True) - - async def update(self, queue_items: List[QueueItem]) -> None: - """Update the existing queue items, mostly caused by reordering.""" - self._items = queue_items - self.signal_update(True) + if self.current_item: + queue_items = played_items + [self.current_item] + next_items + else: + queue_items = played_items + next_items + else: + queue_items = self._items + queue_items + await self._update_items(queue_items) async def clear(self) -> None: """Clear all items in the queue.""" if self.player.state not in (PlayerState.IDLE, PlayerState.OFF): await self.stop() - await self.update([]) + await self._update_items([]) def on_player_update(self) -> None: """Call when player updates.""" @@ -685,12 +683,14 @@ class PlayerQueue: def get_next_index(self, cur_index: Optional[int]) -> int: """Return the next index for the queue, accounting for repeat settings.""" + alert_active = self.stream and self.stream.is_alert # handle repeat single track - if self.settings.repeat_mode == RepeatMode.ONE: + if not alert_active and self.settings.repeat_mode == RepeatMode.ONE: return cur_index # handle repeat all if ( - self.settings.repeat_mode == RepeatMode.ALL + not alert_active + and self.settings.repeat_mode == RepeatMode.ALL and self._items and cur_index == (len(self._items) - 1) ): @@ -735,6 +735,11 @@ class PlayerQueue: "settings": self.settings.to_dict(), } + async def _update_items(self, queue_items: List[QueueItem]) -> None: + """Update the existing queue items, mostly caused by reordering.""" + self._items = queue_items + self.signal_update(True) + def __get_queue_stream_index(self) -> Tuple[int, int]: """Calculate current queue index and current track elapsed time.""" # player is playing a constant stream so we need to do this the hard way diff --git a/music_assistant/models/queue_settings.py b/music_assistant/models/queue_settings.py index 62cac2aa..53b70bac 100644 --- a/music_assistant/models/queue_settings.py +++ b/music_assistant/models/queue_settings.py @@ -55,7 +55,7 @@ class QueueSettings: # can be extended with some more magic based on last_played and stuff next_items = random.sample(next_items, len(next_items)) items = played_items + [self._queue.current_item] + next_items - asyncio.create_task(self._queue.update(items)) + asyncio.create_task(self._queue.load(items)) self._on_update("shuffle_enabled") elif self._shuffle_enabled and not enabled: # unshuffle @@ -65,7 +65,7 @@ class QueueSettings: next_items = self._queue.items[self._queue.current_index + 1 :] next_items.sort(key=lambda x: x.sort_index, reverse=False) items = played_items + [self._queue.current_item] + next_items - asyncio.create_task(self._queue.update(items)) + asyncio.create_task(self._queue.load(items)) self._on_update("shuffle_enabled") @property