from music_assistant.constants import ANNOUNCE_ALERT_FILE, FALLBACK_DURATION
from music_assistant.helpers.tags import parse_tags
from music_assistant.helpers.util import try_parse_int
-from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode
+from music_assistant.models.enums import (
+ EventType,
+ MediaType,
+ ProviderType,
+ QueueOption,
+ RepeatMode,
+)
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.event import MassEvent
from music_assistant.models.media_items import MediaItemType, media_from_dict
async def play_media(
self,
media: str | List[str] | MediaItemType | List[MediaItemType],
- queue_opt: QueueOption = QueueOption.PLAY,
+ option: QueueOption = QueueOption.PLAY,
+ radio_mode: bool = False,
passive: bool = False,
) -> str:
"""
Play media item(s) on the given queue.
- :param media: media(s) that should be played (MediaItem(s) or uri's).
- :param queue_opt:
- QueueOption.PLAY -> Insert new items in queue and start playing at inserted position
- QueueOption.REPLACE -> Replace queue contents with these items
- QueueOption.NEXT -> Play item(s) after current playing item
- QueueOption.ADD -> Append new items at end of the queue
- QueueOption.RADIO -> Fill the queue contents with dynamic content based on the item(s)
- :param passive: if passive set to true the stream url will not be sent to the player.
+ media: Media(s) that should be played (MediaItem(s) or uri's).
+ queue_opt: Which enqueue mode to use.
+ radio_mode: Enable radio mode for the given item(s).
+ passive: If passive set to true the stream url will not be sent to the player.
"""
if self.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
if not isinstance(media, list):
media = [media]
+ # clear queue first if it was finished
+ if self._current_index and self._current_index >= (len(self._items) - 1):
+ self._current_index = None
+ self._items = []
+
+ # clear radio source items if needed
+ if option not in (QueueOption.ADD, QueueOption.PLAY, QueueOption.NEXT):
+ self._radio_source = []
+
tracks: List[MediaItemType] = []
for item in media:
# parse provided uri into a MA MediaItem or Basic QueueItem from URL
media_item = item
# collect tracks to play
- if queue_opt == QueueOption.RADIO:
- # For dynamic/radio mode, the source items are stored and unpacked dynamically
- tracks += [media_item]
- elif media_item.media_type == MediaType.ARTIST:
- tracks += await self.mass.music.artists.toptracks(
- media_item.item_id, provider=media_item.provider
- )
- elif media_item.media_type == MediaType.ALBUM:
- tracks += await self.mass.music.albums.tracks(
- media_item.item_id, provider=media_item.provider
+ ctrl = self.mass.music.get_controller(media_item.media_type)
+ if radio_mode:
+ self._radio_source.append(media_item)
+ # if radio mode enabled, grab the first batch of tracks here
+ tracks += await ctrl.dynamic_tracks(
+ item_id=media_item.item_id, provider=media_item.provider
)
- elif media_item.media_type == MediaType.PLAYLIST:
- tracks += await self.mass.music.playlists.tracks(
+ elif media_item.media_type in (
+ MediaType.ARTIST,
+ MediaType.ALBUM,
+ MediaType.PLAYLIST,
+ ):
+ tracks += await ctrl.tracks(
media_item.item_id, provider=media_item.provider
)
else:
# single track or radio item
tracks += [media_item]
- # Handle Radio playback: clear queue and request first batch
- if queue_opt == QueueOption.RADIO:
- # clear existing items before we start radio
+ # only add valid/available items
+ queue_items = [
+ QueueItem.from_media_item(x) for x in tracks if x and x.available
+ ]
+
+ # load the items into the queue
+ cur_index = self.index_in_buffer or self._current_index or 0
+ shuffle = self.settings.shuffle_enabled and len(queue_items) >= 5
+
+ # handle replace: clear all items and replace with the new items
+ if option == QueueOption.REPLACE:
await self.clear()
- # load the first batch
- await self._load_radio_tracks(tracks)
+ await self.load(queue_items, shuffle=shuffle)
if not passive:
await self.play_index(0)
- return
-
- # only add available items
- queue_items = [QueueItem.from_media_item(x) for x in tracks if x.available]
-
- # clear queue first if it was finished
- if self._current_index and self._current_index >= (len(self._items) - 1):
- self._current_index = None
- self._items = []
-
- # if adding more than 50 items in play/next mode, treat as replace
- if len(queue_items) > 50 and queue_opt in (QueueOption.PLAY, QueueOption.NEXT):
- queue_opt = QueueOption.REPLACE
+ # handle next: add item(s) in the index next to the playing/loaded/buffered index
+ elif option == QueueOption.NEXT:
+ await self.load(queue_items, insert_at_index=cur_index + 1, shuffle=shuffle)
+ elif option == QueueOption.REPLACE_NEXT:
+ await self.load(
+ queue_items,
+ insert_at_index=cur_index + 1,
+ keep_remaining=False,
+ shuffle=shuffle,
+ )
+ # handle play: replace current loaded/playing index with new item(s)
+ elif option == QueueOption.PLAY:
+ await self.load(queue_items, insert_at_index=cur_index, shuffle=shuffle)
+ if not passive:
+ await self.play_index(cur_index)
+ # handle add: add/append item(s) to the remaining queue items
+ elif option == QueueOption.ADD:
+ shuffle = self.settings.shuffle_enabled
+ if shuffle:
+ # shuffle the new items with remaining queue items
+ insert_at_index = cur_index + 1
+ else:
+ # just append at the end
+ insert_at_index = len(self._items)
+ await self.load(
+ queue_items, insert_at_index=insert_at_index, shuffle=shuffle
+ )
- # load the items into the queue
- if queue_opt == QueueOption.REPLACE:
- await self.load(queue_items, passive)
- elif queue_opt == QueueOption.NEXT:
- await self.insert(queue_items, 1, passive)
- elif queue_opt == QueueOption.PLAY:
- await self.insert(queue_items, 0, passive)
- elif queue_opt == QueueOption.ADD:
- await self.append(queue_items)
-
- async def _load_radio_tracks(
- self, radio_items: Optional[List[MediaItemType]] = None
- ) -> None:
+ async def _fill_radio_tracks(self) -> None:
"""Fill the Queue with (additional) Radio tracks."""
- if radio_items:
- self._radio_source = radio_items
assert self._radio_source, "No Radio item(s) loaded/active!"
-
tracks: List[MediaItemType] = []
# grab dynamic tracks for (all) source items
# shuffle the source items, just in case
break
# fill queue - filter out unavailable items
queue_items = [QueueItem.from_media_item(x) for x in tracks if x.available]
- await self.append(queue_items)
+ await self.load(
+ queue_items,
+ insert_at_index=len(self._items) - 1,
+ )
async def play_announcement(self, url: str, prepend_alert: bool = False) -> str:
"""
if (new_index < (self._current_index or 0)) or (new_index > len(self.items)):
return
# move the item in the list
+ # TODO: guard for position that is already played/buffered!
items.insert(new_index, items.pop(item_index))
await self.update_items(items)
self._items.pop(item_index)
self.signal_update(True)
- async def load(self, queue_items: List[QueueItem], passive: bool = False) -> None:
- """Load (overwrite) queue with new items."""
- # reset radio source if a queue load is executed
- self._radio_source = []
- for index, item in enumerate(queue_items):
- item.sort_index = index
- if self.settings.shuffle_enabled and len(queue_items) > 5:
- queue_items = random.sample(queue_items, len(queue_items))
- self._items = [x for x in queue_items if x is not None] # filter None items
- await self.play_index(0, passive=passive)
- self.signal_update(True)
-
- async def insert(
- self, queue_items: List[QueueItem], offset: int = 0, passive: bool = False
+ async def load(
+ self,
+ queue_items: List[QueueItem],
+ insert_at_index: int = 0,
+ keep_remaining: bool = True,
+ shuffle: bool = False,
) -> None:
"""
- Insert new items at offset x from current position.
+ Load new items at index.
- Keeps remaining items in queue.
- if offset 0, will start playing newly added item(s)
- :param queue_items: a list of QueueItem
- :param offset: offset from current queue position
+ queue_items: a list of QueueItem
+ insert_at_index: insert the item(s) at this index
+ keep_remaining: keep the remaining items after the insert
+ shuffle: (re)shuffle the items after insert index
"""
- if offset == 0:
- cur_index = self._current_index
- else:
- cur_index = self.index_in_buffer or self._current_index
- if not self.items or cur_index is None:
- return await self.load(queue_items, passive)
- insert_at_index = cur_index + offset
- for index, item in enumerate(queue_items):
- item.sort_index = insert_at_index + index
- if self.settings.shuffle_enabled and len(queue_items) > 5:
- queue_items = random.sample(queue_items, len(queue_items))
- if offset == 0:
- # replace current item with new
- self._items = (
- self._items[:insert_at_index]
- + queue_items
- + self._items[insert_at_index + 1 :]
- )
- else:
- self._items = (
- self._items[:insert_at_index]
- + queue_items
- + self._items[insert_at_index:]
- )
- if offset in (0, cur_index):
- await self.play_index(insert_at_index, passive=passive)
- self.signal_update(True)
+ # keep previous/played items, append the new ones
+ prev_items = self._items[:insert_at_index]
+ next_items = queue_items
- async def append(self, queue_items: List[QueueItem]) -> None:
- """Append new items at the end of the queue."""
- for index, item in enumerate(queue_items):
- item.sort_index = len(self.items) + index
- if self.settings.shuffle_enabled:
- # if shuffle is enabled we shuffle the remaining tracks and the new ones
- cur_index = self.index_in_buffer or self._current_index
- if cur_index is None:
- played_items = []
- next_items = self.items + queue_items
- cur_items = []
- else:
- played_items = self.items[:cur_index] if cur_index is not None else []
- next_items = self.items[cur_index + 1 :] + queue_items
- if cur_item := self.get_item(cur_index):
- cur_items = [cur_item]
- else:
- cur_items = []
- # do the shuffle
+ # if keep_remaining, append the old previous items
+ if keep_remaining:
+ next_items += self._items[insert_at_index:]
+
+ # we set the original insert order as attribute so we can un-shuffle
+ for index, item in enumerate(next_items):
+ item.sort_index += insert_at_index + index
+ # (re)shuffle the final batch if needed
+ if shuffle:
next_items = random.sample(next_items, len(next_items))
- queue_items = played_items + cur_items + next_items
- else:
- queue_items = self._items + queue_items
- await self.update_items(queue_items)
+ await self.update_items(prev_items + next_items)
async def clear(self) -> None:
"""Clear all items in the queue."""
# watch dynamic radio items refill if needed
fill_index = len(self._items) - 5
if self._radio_source and (new_index >= fill_index):
- self.mass.create_task(self._load_radio_tracks())
+ self.mass.create_task(self._fill_radio_tracks())
# check if a new track is loaded, wait for the streamdetails
if (
if new_item_loaded:
self.signal_update()
+ self.mass.create_task(self._fetch_full_details(self._current_index))
if abs(prev_item_time - self._current_item_elapsed_time) >= 1:
self.mass.signal_event(
MassEvent(
self._current_item_elapsed_time = try_parse_int(db_value)
await self.settings.restore()
+
+ async def _fetch_full_details(self, index: int) -> None:
+ """Background task that fetches the full details of an item in the queue."""
+ if not self._items or len(self._items) < (index + 1):
+ return
+
+ item_before = self._items[index]
+
+ # check if the details are already fetched
+ if item_before.media_item.provider == ProviderType.DATABASE:
+ return
+
+ # fetch full details here to prevent all clients do this on their own
+ full_details = await self.mass.music.get_item_by_uri(
+ self.current_item.media_item.uri, lazy=False
+ )
+ # convert to queueitem in between to minimize data
+ temp_queue_item = QueueItem.from_media_item(full_details)
+
+ # safe guard: check that item still matches
+ # prevents race condition where items changes just while we were waiting for data
+ if self._items[index].item_id != item_before.item_id:
+ return
+ self._items[index].media_item = temp_queue_item.media_item
+ self.signal_update()