async def _set_sonos_queue_from_mass_queue(self, queue_id: str) -> None:
"""Set the SonosQueue items from the given MA PlayerQueue."""
- items = []
+ items: list[PlayerMedia] = []
queue = self.mass.player_queues.get(queue_id)
if not queue:
self.sonos_queue.items.clear()
return
current_index = queue.current_index or 0
+
+ # Add a few items before the current index for context
offset = max(0, current_index - 4)
- queue_items = self.mass.player_queues.items(queue_id=queue_id, offset=offset, limit=10)
- for item in queue_items:
- if not item.available:
- continue
- media = await self.mass.player_queues.player_media_from_queue_item(item, False)
+ for idx in range(offset, current_index):
+ if queue_item := self.mass.player_queues.get_item(queue_id, idx):
+ if queue_item.available:
+ media = await self.mass.player_queues.player_media_from_queue_item(
+ queue_item, False
+ )
+ items.append(media)
+
+ # Add the current item
+ if current_item := self.mass.player_queues.get_item(queue_id, current_index):
+ if current_item.available:
+ media = await self.mass.player_queues.player_media_from_queue_item(
+ current_item, False
+ )
+ items.append(media)
+
+ # Use get_next_item to fetch next items, which accounts for repeat mode
+ last_index: int | str = current_index
+ for _ in range(5):
+ next_item = self.mass.player_queues.get_next_item(queue_id, last_index)
+ if next_item is None:
+ break
+ media = await self.mass.player_queues.player_media_from_queue_item(next_item, False)
items.append(media)
+ last_index = next_item.queue_item_id
+
self.sonos_queue.items = items
self.logger.debug(
"Set Sonos queue items from MA queue %s: %s",