Fix queue corruption issue (#374)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 18 Jun 2022 11:50:50 +0000 (13:50 +0200)
committerGitHub <noreply@github.com>
Sat, 18 Jun 2022 11:50:50 +0000 (13:50 +0200)
* Fix queue get corrupt when adding single item

* fix race condition while playing TTS if player is still off

music_assistant/models/player_queue.py
music_assistant/models/queue_settings.py

index aeb0a87250c7ef33f62a6f26d19be351421ca474..171563090d1745216c2e49bd23b409cf6f4d045b 100644 (file)
@@ -308,23 +308,23 @@ class PlayerQueue:
 
         # start queue with alert sound(s)
         self._items = queue_items
-        self._settings.repeat_mode = RepeatMode.OFF
-        self._settings.shuffle_enabled = False
         await self.queue_stream_start(0, 0, False, is_alert=True)
 
         # wait for the player to finish playing
-        alert_done = asyncio.Event()
+        play_started = asyncio.Event()
+        play_stopped = asyncio.Event()
 
         def handle_event(evt: MassEvent):
-            if self.player.state != PlayerState.PLAYING:
-                alert_done.set()
+            if self.player.state == PlayerState.PLAYING:
+                play_started.set()
+            elif play_started.is_set():
+                play_stopped.set()
 
         unsub = self.mass.subscribe(
             handle_event, EventType.QUEUE_UPDATED, self.queue_id
         )
         try:
-            await asyncio.wait_for(self.stream.done.wait(), 30)
-            await asyncio.wait_for(alert_done.wait(), 30)
+            await asyncio.wait_for(play_stopped.wait(), 30)
         except asyncio.TimeoutError:
             self.logger.warning("Timeout while playing alert")
         finally:
@@ -430,7 +430,7 @@ class PlayerQueue:
         # restore queue
         self._settings.repeat_mode = self._snapshot.repeat
         self._settings.shuffle_enabled = self._snapshot.shuffle
-        await self.update(self._snapshot.items)
+        await self._update_items(self._snapshot.items)
         self._current_index = self._snapshot.index
         self._current_item_elapsed_time = self._snapshot.position
         if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
@@ -481,7 +481,7 @@ class PlayerQueue:
             return
         # move the item in the list
         items.insert(new_index, items.pop(item_index))
-        await self.update(items)
+        await self._update_items(items)
 
     async def delete_item(self, queue_item_id: str) -> None:
         """Delete item (by id or index) from the queue."""
@@ -548,25 +548,23 @@ class PlayerQueue:
         for index, item in enumerate(queue_items):
             item.sort_index = len(self.items) + index
         if self.settings.shuffle_enabled:
+            # if shuffle is enabled we shuffle the remaining tracks and the new ones
             played_items = self.items[:cur_index]
             next_items = self.items[cur_index + 1 :] + queue_items
             next_items = random.sample(next_items, len(next_items))
-            items = played_items + [self.current_item] + next_items
-            await self.update(items)
-            return
-        self._items = self._items + queue_items
-        self.signal_update(True)
-
-    async def update(self, queue_items: List[QueueItem]) -> None:
-        """Update the existing queue items, mostly caused by reordering."""
-        self._items = queue_items
-        self.signal_update(True)
+            if self.current_item:
+                queue_items = played_items + [self.current_item] + next_items
+            else:
+                queue_items = played_items + next_items
+        else:
+            queue_items = self._items + queue_items
+        await self._update_items(queue_items)
 
     async def clear(self) -> None:
         """Clear all items in the queue."""
         if self.player.state not in (PlayerState.IDLE, PlayerState.OFF):
             await self.stop()
-        await self.update([])
+        await self._update_items([])
 
     def on_player_update(self) -> None:
         """Call when player updates."""
@@ -685,12 +683,14 @@ class PlayerQueue:
 
     def get_next_index(self, cur_index: Optional[int]) -> int:
         """Return the next index for the queue, accounting for repeat settings."""
+        alert_active = self.stream and self.stream.is_alert
         # handle repeat single track
-        if self.settings.repeat_mode == RepeatMode.ONE:
+        if not alert_active and self.settings.repeat_mode == RepeatMode.ONE:
             return cur_index
         # handle repeat all
         if (
-            self.settings.repeat_mode == RepeatMode.ALL
+            not alert_active
+            and self.settings.repeat_mode == RepeatMode.ALL
             and self._items
             and cur_index == (len(self._items) - 1)
         ):
@@ -735,6 +735,11 @@ class PlayerQueue:
             "settings": self.settings.to_dict(),
         }
 
+    async def _update_items(self, queue_items: List[QueueItem]) -> None:
+        """Update the existing queue items, mostly caused by reordering."""
+        self._items = queue_items
+        self.signal_update(True)
+
     def __get_queue_stream_index(self) -> Tuple[int, int]:
         """Calculate current queue index and current track elapsed time."""
         # player is playing a constant stream so we need to do this the hard way
index 62cac2aa1b6684a6ddc1a46becff03c3e7cdaed9..53b70baca24267f6d4650cab0c99609ec5d14f97 100644 (file)
@@ -55,7 +55,7 @@ class QueueSettings:
                 # can be extended with some more magic based on last_played and stuff
                 next_items = random.sample(next_items, len(next_items))
                 items = played_items + [self._queue.current_item] + next_items
-                asyncio.create_task(self._queue.update(items))
+                asyncio.create_task(self._queue.load(items))
                 self._on_update("shuffle_enabled")
         elif self._shuffle_enabled and not enabled:
             # unshuffle
@@ -65,7 +65,7 @@ class QueueSettings:
                 next_items = self._queue.items[self._queue.current_index + 1 :]
                 next_items.sort(key=lambda x: x.sort_index, reverse=False)
                 items = played_items + [self._queue.current_item] + next_items
-                asyncio.create_task(self._queue.update(items))
+                asyncio.create_task(self._queue.load(items))
                 self._on_update("shuffle_enabled")
 
     @property