MediaNotFoundError,
ProviderUnavailableError,
)
-from music_assistant_models.media_items import Playlist, Track
+from music_assistant_models.media_items import (
+ Playlist,
+ Track,
+)
-from music_assistant.constants import DB_TABLE_PLAYLISTS
+from music_assistant.constants import (
+ DB_TABLE_PLAYLISTS,
+ PLAYLIST_MEDIA_TYPES,
+ PlaylistPlayableItem,
+)
from music_assistant.helpers.compare import create_safe_string
from music_assistant.helpers.database import UNSET
from music_assistant.helpers.json import serialize_to_json
item_id: str,
provider_instance_id_or_domain: str,
force_refresh: bool = False,
- ) -> AsyncGenerator[Track, None]:
+ ) -> AsyncGenerator[PlaylistPlayableItem, None]:
"""Return playlist tracks for the given provider playlist id."""
if provider_instance_id_or_domain == "library":
library_item = await self.get_library_item(item_id)
if track.uri is not None:
unwrapped_uris.append(track.uri)
elif media_type == MediaType.PLAYLIST:
- async for track in self.tracks(item_id, provider_instance_id_or_domain):
- if track.uri is not None:
- unwrapped_uris.append(track.uri)
- elif media_type == MediaType.TRACK:
+ async for item in self.tracks(item_id, provider_instance_id_or_domain):
+ if item.uri is not None:
+ unwrapped_uris.append(item.uri)
+ elif media_type in PLAYLIST_MEDIA_TYPES:
unwrapped_uris.append(uri)
else:
self.logger.warning(
- "Not adding %s to playlist %s - not a track", uri, playlist.name
+ "Not adding %s to playlist %s - media type not supported in playlists",
+ uri,
+ playlist.name,
)
continue
# parse uri for further processing
media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
+ # non-track items can only be added to builtin playlists
+ if media_type != MediaType.TRACK and playlist_prov.domain != "builtin":
+ self.logger.warning(
+ "Not adding %s to playlist %s - only supported in builtin playlists",
+ uri,
+ playlist.name,
+ )
+ continue
+
# skip if item already in the playlist
if item_id in cur_playlist_track_ids:
self.logger.warning(
continue
# special: the builtin provider can handle uri's from all providers (with uri as id)
- if provider_instance_id_or_domain != "library" and playlist_prov.domain == "builtin":
- # note: we try not to add library uri's to the builtin playlists
- # so we can survive db rebuilds
- if uri not in ids_to_add:
- ids_to_add.append(uri)
- self.logger.info(
- "Adding %s to playlist %s",
- uri,
- playlist.name,
- )
+ if playlist_prov.domain == "builtin":
+ # For non-library URIs, add directly (they're already portable provider URIs)
+ if provider_instance_id_or_domain != "library":
+ if uri not in ids_to_add:
+ ids_to_add.append(uri)
+ self.logger.info(
+ "Adding %s to playlist %s",
+ uri,
+ playlist.name,
+ )
+ continue
+ # For library URIs, convert to provider URIs to survive DB rebuilds
+ # Get the full item from library to access all provider mappings
+ full_item = await self.mass.music.get_item_by_uri(uri)
+ if not hasattr(full_item, "provider_mappings"):
+ self.logger.warning(
+ "Can't add %s to playlist %s - unsupported media type",
+ uri,
+ playlist.name,
+ )
+ continue
+
+ # For tracks, try to match to playlist provider
+ # For non-track items, just use first available mapping
+ provider_mappings = full_item.provider_mappings
+ if media_type == MediaType.TRACK:
+ # Cast to Track for mypy - we know it's a track from media_type check
+ full_track = cast("Track", full_item)
+ # Try to match the track to additional providers
+ track_prov_domains = {x.provider_domain for x in provider_mappings}
+ if (
+ playlist_prov.is_streaming_provider
+ and playlist_prov.domain not in track_prov_domains
+ ):
+ provider_mappings.update(
+ await self.mass.music.tracks.match_provider(
+ full_track, playlist_prov, strict=False
+ )
+ )
+
+ # Sort by quality (highest first) for deterministic selection
+ provider_mappings = sorted(provider_mappings, key=lambda x: x.quality, reverse=True)
+
+ # Add first available provider mapping
+ for prov_mapping in provider_mappings:
+ if not prov_mapping.available:
+ continue
+ item_prov = self.mass.get_provider(prov_mapping.provider_instance)
+ if not item_prov:
+ continue
+ # Create provider URI from the mapping
+ provider_uri = create_uri(
+ media_type,
+ item_prov.instance_id,
+ prov_mapping.item_id,
+ )
+ if (
+ provider_uri not in ids_to_add
+ and provider_uri not in cur_playlist_track_uris
+ ):
+ ids_to_add.append(provider_uri)
+ self.logger.info(
+ "Adding %s to playlist %s",
+ provider_uri,
+ playlist.name,
+ )
+ break
+ else:
+ self.logger.warning(
+ "Can't add %s to playlist %s - no available provider mapping",
+ uri,
+ playlist.name,
+ )
continue
# if target playlist is an exact provider match, we can add it
ids_to_add.append(item_id)
continue
- # ensure we have a full (library) track (including all provider mappings)
+ # For provider-specific playlists: match tracks with quality sorting
+ # (Non-track items can only be added to builtin playlists, validated earlier)
full_track = await self.mass.music.tracks.get(
item_id,
provider_instance_id_or_domain,
playlist.name,
)
break # already existing in the playlist
- if playlist_prov.domain == "builtin":
- # the builtin provider can handle uri's from all providers (with uri as id)
- if track_version_uri not in ids_to_add:
- ids_to_add.append(track_version_uri)
- self.logger.info(
- "Adding %s to playlist %s",
- full_track.name,
- playlist.name,
- )
- break
+ # Add track to provider-specific playlist
if item_prov.instance_id == playlist_prov.instance_id:
if track_version.item_id not in ids_to_add:
ids_to_add.append(track_version.item_id)
provider_instance_id_or_domain: str,
page: int = 0,
force_refresh: bool = False,
- ) -> list[Track]:
+ ) -> list[PlaylistPlayableItem]:
"""Return playlist tracks for the given provider playlist id."""
assert provider_instance_id_or_domain != "library"
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
return []
provider = cast("MusicProvider", provider)
async with self.mass.cache.handle_refresh(force_refresh):
- return await provider.get_playlist_tracks(item_id, page=page)
+ # Builtin provider overrides to return list[PlaylistPlayableItem],
+ # others return list[Track]. Since Track is part of PlaylistPlayableItem union,
+ # this is safe at runtime. Type ignore needed because list is invariant.
+ return await provider.get_playlist_tracks(item_id, page=page) # type: ignore[return-value]
async def radio_mode_base_tracks(
self,
return [
x
async for x in self.tracks(item.item_id, item.provider)
- # filter out unavailable tracks
- if x.available
+ # Radio mode only works with Tracks (filter out all other types)
+ if isinstance(x, Track) and x.available
]
async def match_providers(self, db_item: Playlist) -> None:
import os
import time
from collections.abc import AsyncGenerator
-from typing import TYPE_CHECKING, Final, cast
+from typing import TYPE_CHECKING, Final, cast, get_args
import aiofiles
import shortuuid
)
from music_assistant_models.streamdetails import StreamDetails
-from music_assistant.constants import MASS_LOGO, VARIOUS_ARTISTS_FANART
+from music_assistant.constants import (
+ MASS_LOGO,
+ VARIOUS_ARTISTS_FANART,
+ PlaylistPlayableItem,
+)
from music_assistant.controllers.cache import use_cache
from music_assistant.helpers.tags import AudioTags, async_parse_tags
from music_assistant.helpers.uri import parse_uri
CACHE_CATEGORY_MEDIA_INFO: Final[int] = 1
CACHE_CATEGORY_PLAYLISTS: Final[int] = 2
-
SUPPORTED_FEATURES = {
ProviderFeature.BROWSE,
ProviderFeature.LIBRARY_TRACKS,
self.mass.config.set(key, stored_items)
return True
- async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
- """Get playlist tracks."""
+ async def get_playlist_tracks( # type: ignore[override]
+ self, prov_playlist_id: str, page: int = 0
+ ) -> list[PlaylistPlayableItem]:
+ """Get playlist tracks.
+
+ Builtin provider supports Track, Radio, PodcastEpisode, and Audiobook items in playlists.
+ Overrides base class to return extended union type instead of list[Track].
+ """
if page > 0:
# paging not supported, we always return the whole list at once
return []
if prov_playlist_id in BUILTIN_PLAYLISTS:
- return await self._get_builtin_playlist_tracks(prov_playlist_id)
- # user created universal playlist
- result: list[Track] = []
+ # System-generated playlists (favorites, random, etc.) only contain tracks
+ return list(await self._get_builtin_playlist_tracks(prov_playlist_id))
+ # User-created playlists can contain Track, Radio, PodcastEpisode, and Audiobook items
+ result: list[PlaylistPlayableItem] = []
playlist_items = await self._read_playlist_file_items(prov_playlist_id)
for index, uri in enumerate(playlist_items, 1):
try:
track = await media_controller.get_provider_item(
item_id, provider_instance_id_or_domain
)
- assert isinstance(track, Track)
- track.position = index
- result.append(track)
+ if isinstance(track, get_args(PlaylistPlayableItem)):
+ playlist_item = cast("PlaylistPlayableItem", track)
+ playlist_item.position = index
+ result.append(playlist_item)
+ else:
+ self.logger.warning(
+ "Unsupported media type in playlist %s: %s", prov_playlist_id, type(track)
+ )
except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
self.logger.warning(
"Skipping %s in playlist %s: %s", uri, prov_playlist_id, str(err)