tracks: list[MediaItemType] = []
radio_source: list[MediaItemType] = []
for item in media:
- # parse provided uri into a MA MediaItem or Basic QueueItem from URL
- if isinstance(item, str):
- try:
+ try:
+ # parse provided uri into a MA MediaItem or Basic QueueItem from URL
+ if isinstance(item, str):
media_item = await self.mass.music.get_item_by_uri(item)
- except MusicAssistantError as err:
- # invalid MA uri or item not found error
- msg = f"Invalid uri: {item}"
- raise MediaNotFoundError(msg) from err
- elif isinstance(item, dict):
- media_item = media_from_dict(item)
- else:
- media_item = item
-
- # handle default enqueue option if needed
- if option is None:
- option = QueueOption(
- await self.mass.config.get_core_config_value(
- self.domain,
- f"default_enqueue_action_{media_item.media_type.value}",
+ elif isinstance(item, dict):
+ media_item = media_from_dict(item)
+ else:
+ media_item = item
+
+ # handle default enqueue option if needed
+ if option is None:
+ option = QueueOption(
+ await self.mass.config.get_core_config_value(
+ self.domain,
+ f"default_enqueue_action_{media_item.media_type.value}",
+ )
)
- )
- if option == QueueOption.REPLACE:
- self.clear(queue_id)
-
- # collect tracks to play
- ctrl = self.mass.music.get_controller(media_item.media_type)
- if radio_mode:
- radio_source.append(media_item)
- elif media_item.media_type == MediaType.PLAYLIST:
- async for playlist_track in ctrl.tracks(media_item.item_id, media_item.provider):
- tracks.append(playlist_track)
- await asyncio.sleep(0) # yield to eventloop
- await self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- elif media_item.media_type == MediaType.ARTIST:
- tracks += await self.get_artist_tracks(media_item)
- await self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- elif media_item.media_type == MediaType.ALBUM:
- tracks += await self.get_album_tracks(media_item)
- await self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- else:
- # single track or radio item
- tracks += [media_item]
-
- # handle optional start item (play playlist from here feature)
- if start_item is not None:
- prev_items = []
- next_items = []
- for track in tracks:
- if next_items or track.item_id == start_item:
- next_items.append(track)
- else:
- prev_items.append(track)
- tracks = next_items + prev_items
+ if option == QueueOption.REPLACE:
+ self.clear(queue_id)
+
+ # collect tracks to play
+ ctrl = self.mass.music.get_controller(media_item.media_type)
+ if radio_mode:
+ radio_source.append(media_item)
+ elif media_item.media_type == MediaType.PLAYLIST:
+ async for playlist_track in ctrl.tracks(
+ media_item.item_id, media_item.provider
+ ):
+ tracks.append(playlist_track)
+ await asyncio.sleep(0) # yield to eventloop
+ await self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
+ elif media_item.media_type == MediaType.ARTIST:
+ tracks += await self.get_artist_tracks(media_item)
+ await self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
+ elif media_item.media_type == MediaType.ALBUM:
+ tracks += await self.get_album_tracks(media_item)
+ await self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
+ else:
+ # single track or radio item
+ tracks += [media_item]
+
+ # handle optional start item (play playlist/album from here feature)
+ if start_item is not None:
+ prev_items = []
+ next_items = []
+ for track in tracks:
+ if next_items or track.item_id == start_item:
+ next_items.append(track)
+ else:
+ prev_items.append(track)
+ tracks = next_items + prev_items
+
+ except MusicAssistantError as err:
+ # invalid MA uri or item not found error
+ self.logger.warning("Skipping %s: %s", item, str(err))
# overwrite or append radio source items
if option not in (QueueOption.ADD, QueueOption.PLAY, QueueOption.NEXT):
# only add valid/available items
queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x and x.available]
+ if not queue_items:
+ raise MediaNotFoundError("No playable items found")
+
# load the items into the queue
if queue.state in (PlayerState.PLAYING, PlayerState.PAUSED):
cur_index = queue.index_in_buffer or 0