from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import EventType, ProviderFeature
-from music_assistant.common.models.errors import (
- MediaNotFoundError,
- UnsupportedFeaturedException,
-)
+from music_assistant.common.models.errors import MediaNotFoundError, UnsupportedFeaturedException
from music_assistant.common.models.media_items import (
Album,
AlbumType,
# make sure we have a full track
if isinstance(ref_track.album, ItemMapping):
try:
- maybe_ref_track = await self.mass.music.tracks.get_provider_item(
+ ref_track = await self.mass.music.tracks.get_provider_item( # noqa: PLW2901
ref_track.item_id, ref_track.provider
)
except MediaNotFoundError:
continue
- provider_ref_track = maybe_ref_track or ref_track
+ provider_ref_track = ref_track
for search_str in (
f"{db_artist.name} - {provider_ref_track.name}",
f"{db_artist.name} {provider_ref_track.name}",
if queue.current_index and queue.current_index >= (len(self._queue_items[queue_id]) - 1):
queue.current_index = None
self._queue_items[queue_id] = []
+ # clear queue if needed
+ if option == QueueOption.REPLACE:
+ self.clear(queue_id)
tracks: list[MediaItemType] = []
radio_source: list[MediaItemType] = []
f"default_enqueue_action_{media_item.media_type.value}",
)
)
- if option == QueueOption.REPLACE_NEXT and queue.state not in (
- PlayerState.PLAYING,
- PlayerState.PAUSED,
- ):
- # replace next requested but nothing is playing
- option = QueueOption.REPLACE
- # clear queue if needed
- if option == QueueOption.REPLACE:
- self.clear(queue_id)
+ if option == QueueOption.REPLACE:
+ self.clear(queue_id)
# collect tracks to play
ctrl = self.mass.music.get_controller(media_item.media_type)
cur_index = queue.index_in_buffer or 0
else:
cur_index = queue.current_index or 0
+ insert_at_index = cur_index + 1 if self._queue_items.get(queue_id) else 0
shuffle = queue.shuffle_enabled and len(queue_items) > 1
# handle replace: clear all items and replace with the new items
self.load(
queue_id,
queue_items=queue_items,
- insert_at_index=cur_index + 1,
+ insert_at_index=insert_at_index,
shuffle=shuffle,
)
return
self.load(
queue_id,
queue_items=queue_items,
- insert_at_index=cur_index + 1,
+ insert_at_index=insert_at_index,
keep_remaining=False,
shuffle=shuffle,
)
self.load(
queue_id,
queue_items=queue_items,
- insert_at_index=cur_index + 1,
+ insert_at_index=insert_at_index,
shuffle=shuffle,
)
- next_index = min(cur_index + 1, len(self._queue_items[queue_id]) - 1)
+ next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
await self.play_index(queue_id, next_index)
return
# handle add: add/append item(s) to the remaining queue items
if option == QueueOption.ADD:
- if queue.shuffle_enabled:
- # shuffle the new items with remaining queue items
- insert_at_index = cur_index + 1
- else:
- # just append at the end
- insert_at_index = len(self._queue_items[queue_id])
self.load(
queue_id=queue_id,
queue_items=queue_items,
- insert_at_index=insert_at_index,
+ insert_at_index=insert_at_index
+ if queue.shuffle_enabled
+ else len(self._queue_items[queue_id]),
shuffle=queue.shuffle_enabled,
)
# handle edgecase, queue is empty and items are only added (not played)