From: Anatosun <33899455+anatosun@users.noreply.github.com> Date: Tue, 20 Jan 2026 17:09:14 +0000 (+0100) Subject: Plex connect: Improve queue loading performance (#2735) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=16c7fd970063a85d5f2c88a7ab6c142e6d9be739;p=music-assistant-server.git Plex connect: Improve queue loading performance (#2735) * Plex connect: Improve queue loading performance Start playback immediately with first track while loading remaining tracks in background. This reduces perceived latency when playing queues from Plex. Changes: - Load and play first track immediately when queue is requested - Fetch remaining tracks concurrently in background task - Broadcast timeline update after first track starts * Plex connect: applied copilot suggestion Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Plex connect: fixed task creation * Plex Connect: added await clause on line 715 --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Marvin Schenkel --- diff --git a/music_assistant/providers/plex_connect/player_remote.py b/music_assistant/providers/plex_connect/player_remote.py index a6e2d509..9c6e4b72 100644 --- a/music_assistant/providers/plex_connect/player_remote.py +++ b/music_assistant/providers/plex_connect/player_remote.py @@ -533,7 +533,7 @@ class PlexRemoteControlServer: else: LOGGER.warning("Queue not ready for seeking after timeout") - async def _play_from_plex_queue( # noqa: PLR0915 + async def _play_from_plex_queue( self, player_id: str, container_key: str, @@ -541,7 +541,11 @@ class PlexRemoteControlServer: shuffle: bool, offset: int, ) -> None: - """Fetch play queue from Plex and load tracks.""" + """Fetch play queue from Plex and load tracks. + + Starts playback immediately with the first track, + then loads remaining tracks in the background. + """ try: LOGGER.info(f"Fetching play queue: {container_key}") @@ -565,75 +569,61 @@ class PlexRemoteControlServer: # Track play queue item IDs self.play_queue_item_ids = {} - tracks_to_queue: list[object] = [] - start_index = None - - for plex_idx, item in enumerate(playqueue.items): - track_key = item.key if hasattr(item, "key") else None - play_queue_item_id = ( - item.playQueueItemID if hasattr(item, "playQueueItemID") else None - ) - if track_key: - try: - # Fetch track from MA - track = await self.provider.get_track(track_key) - ma_idx = len(tracks_to_queue) - tracks_to_queue.append(track) - - # Store play queue item ID mapping - if play_queue_item_id: - self.play_queue_item_ids[ma_idx] = play_queue_item_id - - # Check if this is the track at the selected offset - if plex_idx == selected_offset: - start_index = ma_idx - LOGGER.info( - f"Start track at offset {selected_offset}: {track.name}" - ) - except Exception as e: - LOGGER.debug(f"Could not fetch track {track_key}: {e}") - continue - - if tracks_to_queue: - LOGGER.info( - f"Loaded queue with {len(tracks_to_queue)} tracks, " - f"starting at offset {selected_offset} (MA index {start_index})" - ) + # Fetch the first track to start playback immediately + first_item = ( + playqueue.items[selected_offset] + if selected_offset < len(playqueue.items) + else playqueue.items[0] + ) + first_track_key = first_item.key if hasattr(first_item, "key") else None + first_play_queue_item_id = ( + first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None + ) - # Reorder tracks if not starting from the first track - if start_index is not None and start_index > 0: - tracks_to_queue, self.play_queue_item_ids = ( - self._reorder_tracks_for_playback(tracks_to_queue, start_index) + if not first_track_key: + LOGGER.error("No valid first track in play queue") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, ) + return + + # Fetch and start playing the first track immediately + try: + first_track = await self.provider.get_track(first_track_key) + LOGGER.info(f"Starting playback with first track: {first_track.name}") + + # Store first track's play queue item ID mapping + if first_play_queue_item_id: + self.play_queue_item_ids[0] = first_play_queue_item_id - # Queue all tracks + # Start playback immediately with just the first track await self.provider.mass.player_queues.play_media( queue_id=player_id, - media=tracks_to_queue, # type: ignore[arg-type] + media=first_track, option=QueueOption.REPLACE, ) - # Update tracked state to prevent sync loop - # Store the keys in the order they're in MA queue (after reordering) - synced_keys = [] - for track in tracks_to_queue: # type: ignore[assignment] - for mapping in track.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - synced_keys.append(mapping.item_id) - break - self._last_synced_ma_queue_length = len(synced_keys) - self._last_synced_ma_queue_keys = synced_keys - - # Apply shuffle if requested - if shuffle: - await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) - - # Seek to offset if specified + # Seek to offset if specified (do this before loading remaining tracks) if offset > 0: await self._seek_to_offset_after_playback(player_id, offset) - else: - LOGGER.error("No valid tracks in play queue") + + # Broadcast timeline update immediately + await self._broadcast_timeline() + + # Now load the remaining tracks in the background + self.provider.mass.create_task( + self._load_remaining_queue_tracks( + player_id, playqueue, selected_offset, shuffle + ) + ) + + except Exception as e: + LOGGER.exception(f"Error starting playback with first track: {e}") # Fall back to single track if starting_key: track = await self.provider.get_track(starting_key) @@ -664,6 +654,112 @@ class PlexRemoteControlServer: option=QueueOption.REPLACE, ) + async def _load_remaining_queue_tracks( + self, + player_id: str, + playqueue: PlayQueue, + selected_offset: int, + shuffle: bool, + ) -> None: + """Load remaining tracks from play queue in the background. + + :param player_id: The Music Assistant player ID. + :param playqueue: The Plex play queue. + :param selected_offset: The offset of the track that's already playing. + :param shuffle: Whether shuffle is enabled. + """ + try: + # Prepare to fetch all tracks except the first one + remaining_items = [] + + # Get items after selected track + for i in range(selected_offset + 1, len(playqueue.items)): + remaining_items.append((i, playqueue.items[i])) + + # Get items before selected track (these will be added at the end) + for i in range(selected_offset): + remaining_items.append((i, playqueue.items[i])) + + if not remaining_items: + LOGGER.debug("No remaining tracks to load") + return + + # Fetch all remaining tracks concurrently + async def fetch_track( + plex_idx: int, item: Any + ) -> tuple[int, object | None, int | None]: + """Fetch a single track from Plex.""" + track_key = item.key if hasattr(item, "key") else None + play_queue_item_id = ( + item.playQueueItemID if hasattr(item, "playQueueItemID") else None + ) + + if track_key: + try: + track = await self.provider.get_track(track_key) + return plex_idx, track, play_queue_item_id + except Exception as e: + LOGGER.debug(f"Could not fetch track {track_key}: {e}") + + return plex_idx, None, None + + # Fetch all tracks in parallel + fetch_tasks = [fetch_track(idx, item) for idx, item in remaining_items] + results = await asyncio.gather(*fetch_tasks, return_exceptions=True) + + # Process results and build track list + tracks_to_add: list[object] = [] + for result in results: + if isinstance(result, Exception): + LOGGER.debug(f"Error fetching track: {result}") + continue + + # result is guaranteed to be a tuple here after the Exception check + _plex_idx, track, play_queue_item_id = result # type: ignore[misc] + if track: + ma_idx = len(tracks_to_add) + 1 # +1 because first track is already queued + tracks_to_add.append(track) + + # Store play queue item ID mapping + if play_queue_item_id: + self.play_queue_item_ids[ma_idx] = play_queue_item_id + + if tracks_to_add: + LOGGER.info(f"Adding {len(tracks_to_add)} remaining tracks to queue") + + # Add remaining tracks to the queue + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=tracks_to_add, # type: ignore[arg-type] + option=QueueOption.ADD, + ) + + # Update tracked state to prevent sync loop + queue_items = self.provider.mass.player_queues.items(player_id) + synced_keys = [] + for item in queue_items: + if item.media_item: + for mapping in item.media_item.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + synced_keys.append(mapping.item_id) + break + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + + # Apply shuffle if requested (after all tracks are loaded) + if shuffle: + await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) + + LOGGER.info( + f"Successfully loaded {len(tracks_to_add)} remaining tracks " + f"(total queue: {len(synced_keys)} tracks)" + ) + else: + LOGGER.warning("No valid remaining tracks found in play queue") + + except Exception as e: + LOGGER.exception(f"Error loading remaining queue tracks: {e}") + async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None: """Replace the entire queue when nothing is currently playing. @@ -905,53 +1001,56 @@ class PlexRemoteControlServer: f"Created play queue {self.play_queue_id} with {len(playqueue.items)} items" ) - # Load tracks from the created queue + # Fetch the first track to start playback immediately self.play_queue_item_ids = {} - tracks_to_queue = [] + first_item = playqueue.items[0] + first_track_key = first_item.key if hasattr(first_item, "key") else None + first_play_queue_item_id = ( + first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None + ) - for i, item in enumerate(playqueue.items): - track_key = item.key if hasattr(item, "key") else None - play_queue_item_id = ( - item.playQueueItemID if hasattr(item, "playQueueItemID") else None - ) + if not first_track_key: + LOGGER.error("No valid first track in created play queue") + return web.Response(status=500, text="Failed to load tracks from play queue") + + try: + # Fetch and start playing the first track immediately + first_track = await self.provider.get_track(first_track_key) + LOGGER.info(f"Starting playback with first track: {first_track.name}") - if track_key: - try: - # Fetch track from MA - track = await self.provider.get_track(track_key) - tracks_to_queue.append(track) - - # Store play queue item ID mapping - if play_queue_item_id: - self.play_queue_item_ids[len(tracks_to_queue) - 1] = ( - play_queue_item_id - ) - except Exception as e: - LOGGER.debug(f"Could not fetch track {track_key}: {e}") - continue - - if tracks_to_queue: - # Queue all tracks + # Store first track's play queue item ID mapping + if first_play_queue_item_id: + self.play_queue_item_ids[0] = first_play_queue_item_id + + # Start playback immediately with just the first track await self.provider.mass.player_queues.play_media( queue_id=player_id, - media=tracks_to_queue, # type: ignore[arg-type] + media=first_track, option=QueueOption.REPLACE, ) - # Apply shuffle if requested (Plex may have already shuffled server-side) - if shuffle: - await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) - else: - LOGGER.error("No valid tracks in created play queue") - return web.Response(status=500, text="Failed to load tracks from play queue") + # Now load the remaining tracks in the background + if len(playqueue.items) > 1: + self.provider.mass.create_task( + self._load_remaining_queue_tracks( + player_id, + playqueue, + 0, # Selected offset is 0 since we started from the first track + shuffle, + ) + ) + + # Broadcast timeline update + await self._broadcast_timeline() + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error starting playback with first track: {e}") + return web.Response(status=500, text=f"Failed to start playback: {e}") else: LOGGER.error("Failed to create play queue or queue is empty") return web.Response(status=500, text="Failed to create play queue") - # Broadcast timeline update - await self._broadcast_timeline() - return web.Response(status=200) - except Exception as e: LOGGER.exception(f"Error handling createPlayQueue: {e}") return web.Response(status=500, text=str(e))