from music_assistant.models.event import MassEvent
from .enums import EventType, MediaType, ProviderType
-from .media_items import MediaItemType
+from .media_items import MediaItemType, media_from_dict
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
f"{prov.type.value}.search.{self.media_type.value}.{search_query}.{limit}"
)
if cache := await self.mass.cache.get(cache_key):
- return [self.media_type.from_dict(x) for x in cache]
+ return [media_from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = await prov.search(
search_query,
async def append(self, queue_items: List[QueueItem]) -> None:
"""Append new items at the end of the queue."""
- cur_index = self._current_index or 0
for index, item in enumerate(queue_items):
item.sort_index = len(self.items) + index
if self.settings.shuffle_enabled:
# if shuffle is enabled we shuffle the remaining tracks and the new ones
- played_items = self.items[:cur_index]
- next_items = self.items[cur_index + 1 :] + queue_items
- next_items = random.sample(next_items, len(next_items))
- if self.current_item:
- queue_items = played_items + [self.current_item] + next_items
+ cur_index = self.index_in_buffer or self._current_index
+ if cur_index is None:
+ played_items = []
+ next_items = self.items + queue_items
+ cur_item = []
else:
- queue_items = played_items + next_items
+ played_items = self.items[:cur_index] if cur_index is not None else []
+ next_items = self.items[cur_index + 1 :] + queue_items
+ cur_item = [self.get_item(cur_index)]
+ # do the shuffle
+ next_items = random.sample(next_items, len(next_items))
+ queue_items = played_items + cur_item + next_items
else:
queue_items = self._items + queue_items
await self._update_items(queue_items)