Fix unsollicited (re)starts of queue playback (#438)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 25 Jul 2022 17:44:58 +0000 (19:44 +0200)
committerGitHub <noreply@github.com>
Mon, 25 Jul 2022 17:44:58 +0000 (19:44 +0200)
* Fix unsollicited (re)starts of queue playback

.github/workflows/test.yml
music_assistant/controllers/streams.py
music_assistant/models/player_queue.py

index 8030183460381e6830004c1f6f2cd406403cffe8..f95fb618d586f9cdacb92421ca2df1a6713ff089 100644 (file)
@@ -24,7 +24,6 @@ jobs:
           python-version: ${{ matrix.python-version }}
       - name: Install dependencies
         run: |
-          apt-get update
           apt-get install ffmpeg
           python -m pip install --upgrade pip
           pip install -r requirements_dev.txt
index d18d556f085c7c2d65cee1a90eb3f9f48cd65a24..ec8edf7b57c3a4bf3fe76d47ed0c8288d7492e51 100644 (file)
@@ -74,10 +74,6 @@ class StreamsController:
         enc_track_id = urllib.parse.quote(track_id)
         return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}"
 
-    def get_control_url(self, queue_id: str, control: str = "next") -> str:
-        """Return url to control endpoint."""
-        return f"{self.base_url}/{queue_id}/{control}"
-
     def get_silence_url(
         self,
         content_type: ContentType = ContentType.WAV,
@@ -92,7 +88,6 @@ class StreamsController:
 
         app.router.add_get("/preview", self.serve_preview)
         app.router.add_get("/silence.{fmt}", self.serve_silence)
-        app.router.add_get("/{queue_id}/{control}", self.serve_control)
         app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream)
 
         runner = web.AppRunner(app, access_log=None)
@@ -124,24 +119,6 @@ class StreamsController:
 
         self.logger.info("Started stream server on port %s", self._port)
 
-    async def serve_control(self, request: web.Request):
-        """Server player control endpoint."""
-        queue_id = request.match_info["queue_id"]
-        control = request.match_info["control"]
-        if queue := self.mass.players.get_player_queue(queue_id):
-            if control == "next" and queue.signal_next is None:
-                await queue.next()
-
-        resp = web.StreamResponse(
-            status=200, reason="OK", headers={"Content-Type": "audio/wav"}
-        )
-        await resp.prepare(request)
-        if request.method == "GET":
-            # service 1 second of silence while player is processing request
-            async for chunk in get_silence(1, ContentType.WAV):
-                await resp.write(chunk)
-        return resp
-
     async def serve_preview(self, request: web.Request):
         """Serve short preview sample."""
         provider_id = request.query["provider_id"]
@@ -354,7 +331,7 @@ class QueueStream:
         self.done = asyncio.Event()
         self.all_clients_connected = asyncio.Event()
         self.index_in_buffer = start_index
-        self.signal_next: bool = False
+        self.signal_next: Optional[int] = None
         self._runner_task: Optional[asyncio.Task] = None
         self._prev_chunk: bytes = b""
         if queue.settings.metadata_mode == MetadataMode.LEGACY:
index 0757b5d4c580eb4c356540823c6ad30c6d80534f..ff5ae60daeb52c7d968f757ff10e0f469d5bc9e0 100644 (file)
@@ -69,13 +69,12 @@ class PlayerQueue:
         self.mass = mass
         self.logger = mass.players.logger
         self.queue_id = player_id
-        self.signal_next: Optional[int] = 0
         self._stream_id: str = ""
         self._settings = QueueSettings(self)
         self._current_index: Optional[int] = None
         self._current_item_elapsed_time: int = 0
         self._prev_item: Optional[QueueItem] = None
-        self._last_player_state: Tuple[str, str] = ("", "")
+        self._last_player_state: Tuple[PlayerState, str] = (PlayerState.OFF, "")
         self._items: List[QueueItem] = []
         self._save_task: TimerHandle = None
         self._last_player_update: int = 0
@@ -377,7 +376,8 @@ class PlayerQueue:
         if self.announcement_in_progress:
             self.logger.warning("Ignore queue command: An announcement is in progress")
             return
-        self.signal_next = None
+        if stream := self.stream:
+            stream.signal_next = None
         # redirect to underlying player
         await self.player.stop()
 
@@ -408,7 +408,7 @@ class PlayerQueue:
 
     async def next(self) -> None:
         """Play the next track in the queue."""
-        next_index = self.get_next_index(self._current_index)
+        next_index = self.get_next_index(self._current_index, True)
         if next_index is None:
             return None
         await self.play_index(next_index)
@@ -441,23 +441,26 @@ class PlayerQueue:
             await self.player.play_url(last_player_url)
             return
         resume_item = self.current_item
+        next_item = self.next_item
         resume_pos = self._current_item_elapsed_time
         if (
             resume_item
+            and next_item
             and resume_item.duration
             and resume_pos > (resume_item.duration * 0.9)
         ):
             # track is already played for > 90% - skip to next
-            resume_item = self.next_item
+            resume_item = next_item
+            resume_pos = 0
+        elif self._current_index is None and len(self._items) > 0:
+            # items available in queue but no previous track, start at 0
+            resume_item = self.get_item(0)
             resume_pos = 0
 
         if resume_item is not None:
             resume_pos = resume_pos if resume_pos > 10 else 0
             fade_in = resume_pos > 0
             await self.play_index(resume_item.item_id, resume_pos, fade_in)
-        elif len(self._items) > 0:
-            # items available in queue but no previous track, start at 0
-            await self.play_index(0)
         else:
             self.logger.warning(
                 "resume queue requested for %s but queue is empty", self.queue_id
@@ -492,7 +495,7 @@ class PlayerQueue:
             self._current_index = self._snapshot.index
             self._current_item_elapsed_time = self._snapshot.position
             self._last_player_state = (
-                self._snapshot.state.value,
+                self._snapshot.state,
                 self._snapshot.player_url,
             )
             if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
@@ -519,6 +522,9 @@ class PlayerQueue:
         if self.announcement_in_progress:
             self.logger.warning("Ignore queue command: An announcement is in progress")
             return
+        if stream := self.stream:
+            # make sure that the previous stream is not auto restarted (race condition)
+            stream.signal_next = None
         if not isinstance(index, int):
             index = self.index_by_id(index)
         if index is None:
@@ -652,39 +658,53 @@ class PlayerQueue:
 
     def on_player_update(self) -> None:
         """Call when player updates."""
-        cur_player_state = (self.player.state.value, self.player.current_url)
-        if self._last_player_state != cur_player_state:
-            # playback state changed
+        prev_state = self._last_player_state
+        new_state = (self.player.state, self.player.current_url)
+
+        # handle PlayerState changed
+        if new_state[0] != prev_state[0]:
+
+            # store previous state
             if self.announcement_in_progress:
                 # while announcement in progress dont update the last url
                 # to allow us to resume from 3rd party sources
                 # https://github.com/music-assistant/hass-music-assistant/issues/697
-                self._last_player_state = (
-                    cur_player_state[0],
-                    self._last_player_state[1],
-                )
+                self._last_player_state = (new_state[0], prev_state[1])
             else:
-                self._last_player_state = cur_player_state
-
-            # always signal update if playback state changed
-            self.signal_update()
-            if self.player.state == PlayerState.IDLE:
+                self._last_player_state = new_state
+
+            # the queue stream was aborted on purpose and needs to restart
+            if (
+                prev_state[0] == PlayerState.PLAYING
+                and new_state[0] == PlayerState.IDLE
+                and self.stream
+                and self.stream.signal_next is not None
+            ):
+                # the queue stream was aborted on purpose (e.g. because of sample rate mismatch)
+                # we need to restart the stream with the next index
+                self._current_item_elapsed_time = 0
+                self.mass.create_task(self.play_index(self.stream.signal_next))
+                return
 
-                # handle case where stream stopped on purpose and we need to restart it
-                if self.signal_next is not None:
-                    self._current_item_elapsed_time = 0
-                    next_idx = self.signal_next
-                    self.signal_next = None
-                    self.mass.create_task(self.play_index(next_idx))
+            # queue exhausted or player turned off/stopped
+            if (
+                self.stream
+                and new_state[0] in (PlayerState.IDLE, PlayerState.OFF)
+                or not self.player.available
+            ):
+                self.stream.signal_next = None
+                # handle last track of the queue, set the index to index that is out of range
+                if self._current_index >= (len(self._items) - 1):
+                    self._current_index += 1
 
-        self.update_state()
+        # always signal update if the PlayerState changed
+        if new_state != prev_state:
+            self.signal_update()
 
-    def update_state(self) -> None:
-        """Update queue details, called when player updates."""
-        if self.player.active_queue != self:
-            return
-        if not self.active:
+        # update queue details if we're the active queue for the attached player
+        if self.player.active_queue != self or not self.active:
             return
+
         new_index = self._current_index
         track_time = self._current_item_elapsed_time
         new_item_loaded = False
@@ -738,10 +758,10 @@ class PlayerQueue:
         self._current_index = start_index
         return stream
 
-    def get_next_index(self, cur_index: Optional[int]) -> int:
+    def get_next_index(self, cur_index: Optional[int], is_skip: bool = False) -> int:
         """Return the next index for the queue, accounting for repeat settings."""
         # handle repeat single track
-        if self.settings.repeat_mode == RepeatMode.ONE:
+        if self.settings.repeat_mode == RepeatMode.ONE and not is_skip:
             return cur_index
         # handle repeat all
         if (
@@ -759,16 +779,35 @@ class PlayerQueue:
     def signal_update(self, items_changed: bool = False) -> None:
         """Signal state changed of this queue."""
         if items_changed:
-            self.mass.create_task(self._save_items())
             self.mass.signal_event(
                 MassEvent(
                     EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self
                 )
             )
+            # save items
+            self.mass.create_task(
+                self.mass.cache.set(
+                    f"queue.{self.queue_id}.items",
+                    [x.to_dict() for x in self._items],
+                )
+            )
+
         # always send the base event
         self.mass.signal_event(
             MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
         )
+        # save state
+        self.mass.create_task(
+            self.mass.database.set_setting(
+                f"queue.{self.queue_id}.current_index", self._current_index
+            )
+        )
+        self.mass.create_task(
+            self.mass.database.set_setting(
+                f"queue.{self.queue_id}.current_item_elapsed_time",
+                self._current_item_elapsed_time,
+            )
+        )
 
     def to_dict(self) -> Dict[str, Any]:
         """Export object to dict."""
@@ -833,31 +872,25 @@ class PlayerQueue:
 
     async def _restore_items(self) -> None:
         """Try to load the saved state from cache."""
-        if queue_cache := await self.mass.cache.get(f"queue_items.{self.queue_id}"):
+        if queue_cache := await self.mass.cache.get(f"queue.{self.queue_id}.items"):
             try:
-                self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]]
-                self._current_index = queue_cache["current_index"]
-                self._current_item_elapsed_time = queue_cache.get(
-                    "current_item_elapsed_time", 0
-                )
+                self._items = [QueueItem.from_dict(x) for x in queue_cache]
             except (KeyError, AttributeError, TypeError) as err:
                 self.logger.warning(
                     "Unable to restore queue state for queue %s",
                     self.queue_id,
                     exc_info=err,
                 )
-        await self.settings.restore()
+            else:
+                # restore state too
+                db_key = f"queue.{self.queue_id}.current_index"
+                if db_value := await self.mass.database.get_setting(db_key):
+                    self._current_index = int(db_value)
+                db_key = f"queue.{self.queue_id}.current_item_elapsed_time"
+                if db_value := await self.mass.database.get_setting(db_key):
+                    self._current_item_elapsed_time = int(db_value)
 
-    async def _save_items(self) -> None:
-        """Save current queue items/state in cache."""
-        await self.mass.cache.set(
-            f"queue_items.{self.queue_id}",
-            {
-                "items": [x.to_dict() for x in self._items],
-                "current_index": self._current_index,
-                "current_item_elapsed_time": self._current_item_elapsed_time,
-            },
-        )
+        await self.settings.restore()
 
     async def _wait_for_state(
         self,