From 974a89c7847cbc8ff5ea192af6f723a3eabeb96e Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 25 Jul 2022 19:44:58 +0200 Subject: [PATCH] Fix unsollicited (re)starts of queue playback (#438) * Fix unsollicited (re)starts of queue playback --- .github/workflows/test.yml | 1 - music_assistant/controllers/streams.py | 25 +---- music_assistant/models/player_queue.py | 139 +++++++++++++++---------- 3 files changed, 87 insertions(+), 78 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 80301834..f95fb618 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,6 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - apt-get update apt-get install ffmpeg python -m pip install --upgrade pip pip install -r requirements_dev.txt diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index d18d556f..ec8edf7b 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -74,10 +74,6 @@ class StreamsController: enc_track_id = urllib.parse.quote(track_id) return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}" - def get_control_url(self, queue_id: str, control: str = "next") -> str: - """Return url to control endpoint.""" - return f"{self.base_url}/{queue_id}/{control}" - def get_silence_url( self, content_type: ContentType = ContentType.WAV, @@ -92,7 +88,6 @@ class StreamsController: app.router.add_get("/preview", self.serve_preview) app.router.add_get("/silence.{fmt}", self.serve_silence) - app.router.add_get("/{queue_id}/{control}", self.serve_control) app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream) runner = web.AppRunner(app, access_log=None) @@ -124,24 +119,6 @@ class StreamsController: self.logger.info("Started stream server on port %s", self._port) - async def serve_control(self, request: web.Request): - """Server player control endpoint.""" - queue_id = request.match_info["queue_id"] - control = request.match_info["control"] - if queue := self.mass.players.get_player_queue(queue_id): - if control == "next" and queue.signal_next is None: - await queue.next() - - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": "audio/wav"} - ) - await resp.prepare(request) - if request.method == "GET": - # service 1 second of silence while player is processing request - async for chunk in get_silence(1, ContentType.WAV): - await resp.write(chunk) - return resp - async def serve_preview(self, request: web.Request): """Serve short preview sample.""" provider_id = request.query["provider_id"] @@ -354,7 +331,7 @@ class QueueStream: self.done = asyncio.Event() self.all_clients_connected = asyncio.Event() self.index_in_buffer = start_index - self.signal_next: bool = False + self.signal_next: Optional[int] = None self._runner_task: Optional[asyncio.Task] = None self._prev_chunk: bytes = b"" if queue.settings.metadata_mode == MetadataMode.LEGACY: diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 0757b5d4..ff5ae60d 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -69,13 +69,12 @@ class PlayerQueue: self.mass = mass self.logger = mass.players.logger self.queue_id = player_id - self.signal_next: Optional[int] = 0 self._stream_id: str = "" self._settings = QueueSettings(self) self._current_index: Optional[int] = None self._current_item_elapsed_time: int = 0 self._prev_item: Optional[QueueItem] = None - self._last_player_state: Tuple[str, str] = ("", "") + self._last_player_state: Tuple[PlayerState, str] = (PlayerState.OFF, "") self._items: List[QueueItem] = [] self._save_task: TimerHandle = None self._last_player_update: int = 0 @@ -377,7 +376,8 @@ class PlayerQueue: if self.announcement_in_progress: self.logger.warning("Ignore queue command: An announcement is in progress") return - self.signal_next = None + if stream := self.stream: + stream.signal_next = None # redirect to underlying player await self.player.stop() @@ -408,7 +408,7 @@ class PlayerQueue: async def next(self) -> None: """Play the next track in the queue.""" - next_index = self.get_next_index(self._current_index) + next_index = self.get_next_index(self._current_index, True) if next_index is None: return None await self.play_index(next_index) @@ -441,23 +441,26 @@ class PlayerQueue: await self.player.play_url(last_player_url) return resume_item = self.current_item + next_item = self.next_item resume_pos = self._current_item_elapsed_time if ( resume_item + and next_item and resume_item.duration and resume_pos > (resume_item.duration * 0.9) ): # track is already played for > 90% - skip to next - resume_item = self.next_item + resume_item = next_item + resume_pos = 0 + elif self._current_index is None and len(self._items) > 0: + # items available in queue but no previous track, start at 0 + resume_item = self.get_item(0) resume_pos = 0 if resume_item is not None: resume_pos = resume_pos if resume_pos > 10 else 0 fade_in = resume_pos > 0 await self.play_index(resume_item.item_id, resume_pos, fade_in) - elif len(self._items) > 0: - # items available in queue but no previous track, start at 0 - await self.play_index(0) else: self.logger.warning( "resume queue requested for %s but queue is empty", self.queue_id @@ -492,7 +495,7 @@ class PlayerQueue: self._current_index = self._snapshot.index self._current_item_elapsed_time = self._snapshot.position self._last_player_state = ( - self._snapshot.state.value, + self._snapshot.state, self._snapshot.player_url, ) if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED): @@ -519,6 +522,9 @@ class PlayerQueue: if self.announcement_in_progress: self.logger.warning("Ignore queue command: An announcement is in progress") return + if stream := self.stream: + # make sure that the previous stream is not auto restarted (race condition) + stream.signal_next = None if not isinstance(index, int): index = self.index_by_id(index) if index is None: @@ -652,39 +658,53 @@ class PlayerQueue: def on_player_update(self) -> None: """Call when player updates.""" - cur_player_state = (self.player.state.value, self.player.current_url) - if self._last_player_state != cur_player_state: - # playback state changed + prev_state = self._last_player_state + new_state = (self.player.state, self.player.current_url) + + # handle PlayerState changed + if new_state[0] != prev_state[0]: + + # store previous state if self.announcement_in_progress: # while announcement in progress dont update the last url # to allow us to resume from 3rd party sources # https://github.com/music-assistant/hass-music-assistant/issues/697 - self._last_player_state = ( - cur_player_state[0], - self._last_player_state[1], - ) + self._last_player_state = (new_state[0], prev_state[1]) else: - self._last_player_state = cur_player_state - - # always signal update if playback state changed - self.signal_update() - if self.player.state == PlayerState.IDLE: + self._last_player_state = new_state + + # the queue stream was aborted on purpose and needs to restart + if ( + prev_state[0] == PlayerState.PLAYING + and new_state[0] == PlayerState.IDLE + and self.stream + and self.stream.signal_next is not None + ): + # the queue stream was aborted on purpose (e.g. because of sample rate mismatch) + # we need to restart the stream with the next index + self._current_item_elapsed_time = 0 + self.mass.create_task(self.play_index(self.stream.signal_next)) + return - # handle case where stream stopped on purpose and we need to restart it - if self.signal_next is not None: - self._current_item_elapsed_time = 0 - next_idx = self.signal_next - self.signal_next = None - self.mass.create_task(self.play_index(next_idx)) + # queue exhausted or player turned off/stopped + if ( + self.stream + and new_state[0] in (PlayerState.IDLE, PlayerState.OFF) + or not self.player.available + ): + self.stream.signal_next = None + # handle last track of the queue, set the index to index that is out of range + if self._current_index >= (len(self._items) - 1): + self._current_index += 1 - self.update_state() + # always signal update if the PlayerState changed + if new_state != prev_state: + self.signal_update() - def update_state(self) -> None: - """Update queue details, called when player updates.""" - if self.player.active_queue != self: - return - if not self.active: + # update queue details if we're the active queue for the attached player + if self.player.active_queue != self or not self.active: return + new_index = self._current_index track_time = self._current_item_elapsed_time new_item_loaded = False @@ -738,10 +758,10 @@ class PlayerQueue: self._current_index = start_index return stream - def get_next_index(self, cur_index: Optional[int]) -> int: + def get_next_index(self, cur_index: Optional[int], is_skip: bool = False) -> int: """Return the next index for the queue, accounting for repeat settings.""" # handle repeat single track - if self.settings.repeat_mode == RepeatMode.ONE: + if self.settings.repeat_mode == RepeatMode.ONE and not is_skip: return cur_index # handle repeat all if ( @@ -759,16 +779,35 @@ class PlayerQueue: def signal_update(self, items_changed: bool = False) -> None: """Signal state changed of this queue.""" if items_changed: - self.mass.create_task(self._save_items()) self.mass.signal_event( MassEvent( EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self ) ) + # save items + self.mass.create_task( + self.mass.cache.set( + f"queue.{self.queue_id}.items", + [x.to_dict() for x in self._items], + ) + ) + # always send the base event self.mass.signal_event( MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) ) + # save state + self.mass.create_task( + self.mass.database.set_setting( + f"queue.{self.queue_id}.current_index", self._current_index + ) + ) + self.mass.create_task( + self.mass.database.set_setting( + f"queue.{self.queue_id}.current_item_elapsed_time", + self._current_item_elapsed_time, + ) + ) def to_dict(self) -> Dict[str, Any]: """Export object to dict.""" @@ -833,31 +872,25 @@ class PlayerQueue: async def _restore_items(self) -> None: """Try to load the saved state from cache.""" - if queue_cache := await self.mass.cache.get(f"queue_items.{self.queue_id}"): + if queue_cache := await self.mass.cache.get(f"queue.{self.queue_id}.items"): try: - self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]] - self._current_index = queue_cache["current_index"] - self._current_item_elapsed_time = queue_cache.get( - "current_item_elapsed_time", 0 - ) + self._items = [QueueItem.from_dict(x) for x in queue_cache] except (KeyError, AttributeError, TypeError) as err: self.logger.warning( "Unable to restore queue state for queue %s", self.queue_id, exc_info=err, ) - await self.settings.restore() + else: + # restore state too + db_key = f"queue.{self.queue_id}.current_index" + if db_value := await self.mass.database.get_setting(db_key): + self._current_index = int(db_value) + db_key = f"queue.{self.queue_id}.current_item_elapsed_time" + if db_value := await self.mass.database.get_setting(db_key): + self._current_item_elapsed_time = int(db_value) - async def _save_items(self) -> None: - """Save current queue items/state in cache.""" - await self.mass.cache.set( - f"queue_items.{self.queue_id}", - { - "items": [x.to_dict() for x in self._items], - "current_index": self._current_index, - "current_item_elapsed_time": self._current_item_elapsed_time, - }, - ) + await self.settings.restore() async def _wait_for_state( self, -- 2.34.1