from collections.abc import AsyncGenerator, Callable, Coroutine
from music_assistant_models.provider import ProviderManifest
+ from plexapi.library import LibraryMediaTag as PlexCollection
from plexapi.library import MusicSection as PlexMusicSection
from plexapi.media import AudioStream as PlexAudioStream
from plexapi.media import Media as PlexMedia
CONF_LOCAL_SERVER_PORT = "local_server_port"
CONF_LOCAL_SERVER_SSL = "local_server_ssl"
CONF_LOCAL_SERVER_VERIFY_CERT = "local_server_verify_cert"
+CONF_IMPORT_COLLECTIONS = "import_collections"
+CONF_COLLECTION_PREFIX = "collection_prefix"
FAKE_ARTIST_PREFIX = "_fake://"
)
)
+ # Collection import options (advanced settings)
+ entries.append(
+ ConfigEntry(
+ key=CONF_IMPORT_COLLECTIONS,
+ type=ConfigEntryType.BOOLEAN,
+ label="Import Collections",
+ description="Import collections (tracks, albums, or artists) as playlists",
+ default_value=False,
+ category="advanced",
+ )
+ )
+ entries.append(
+ ConfigEntry(
+ key=CONF_COLLECTION_PREFIX,
+ type=ConfigEntryType.STRING,
+ label="Collection Prefix",
+ description="Prefix to add to collection names when imported as playlists",
+ default_value="Collection: ",
+ depends_on=CONF_IMPORT_COLLECTIONS,
+ category="advanced",
+ )
+ )
+
# return all config entries
return tuple(entries)
playlist.is_editable = not plex_playlist.smart
return playlist
+ async def _parse_collection(self, plex_collection: PlexCollection) -> Playlist:
+ """Parse a Plex Collection response to a Playlist object."""
+ # Get the configured collection prefix
+ collection_prefix = str(self.config.get_value(CONF_COLLECTION_PREFIX) or "")
+
+ # Collections are imported as playlists with the configured prefix
+ playlist = Playlist(
+ item_id=f"collection:{plex_collection.key}",
+ provider=self.lookup_key,
+ name=f"{collection_prefix}{plex_collection.title}",
+ provider_mappings={
+ ProviderMapping(
+ item_id=f"collection:{plex_collection.key}",
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ # Add collection poster/thumbnail if available
+ if thumb := plex_collection.firstAttr("thumb", "composite"):
+ playlist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.lookup_key,
+ remotely_accessible=False,
+ )
+ ]
+ )
+ # Collections are not editable in Music Assistant
+ playlist.is_editable = False
+ return playlist
+
async def _parse_track(self, plex_track: PlexTrack) -> Track:
"""Parse a Plex Track response to a Track model object."""
if plex_track.media:
for playlist in playlists_obj:
yield await self._parse_playlist(playlist)
+ # Import collections as playlists if enabled
+ if self.config.get_value(CONF_IMPORT_COLLECTIONS):
+ collections_obj = await self._run_async(self._plex_library.collections)
+ for collection in collections_obj:
+ yield await self._parse_collection(collection)
+
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Plex Music."""
page_size = 500
@use_cache(3600 * 3) # Cache for 3 hours
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
+ # Check if this is a collection (collections have the format "collection:<key>")
+ if prov_playlist_id.startswith("collection:"):
+ # Extract the collection key
+ collection_key = prov_playlist_id.replace("collection:", "")
+ # Fetch the collection
+ if plex_collection := await self._run_async(
+ self._plex_library.fetchItem, collection_key
+ ):
+ return await self._parse_collection(plex_collection)
+ msg = f"Collection {prov_playlist_id} not found"
+ raise MediaNotFoundError(msg)
+
if plex_playlist := await self._get_data(prov_playlist_id, PlexPlaylist):
return await self._parse_playlist(plex_playlist)
msg = f"Item {prov_playlist_id} not found"
if page > 0:
# paging not supported, we always return the whole list at once
return []
+
+ # Check if this is a collection (collections have the format "collection:<key>")
+ if prov_playlist_id.startswith("collection:"):
+ # Extract the collection key
+ collection_key = prov_playlist_id.replace("collection:", "")
+ # Fetch the collection
+ plex_collection = await self._run_async(self._plex_library.fetchItem, collection_key)
+ if not plex_collection:
+ msg = f"Collection {prov_playlist_id} not found"
+ raise MediaNotFoundError(msg)
+ if not (collection_items := await self._run_async(plex_collection.items)):
+ return result
+ # Collections can contain tracks, albums, or artists - we only want tracks
+ for item in collection_items:
+ if item.type == "track":
+ if track := await self._parse_track(item):
+ track.position = len(result) + 1
+ result.append(track)
+ elif item.type == "album":
+ # If the collection contains albums, get all tracks from each album
+ album_tracks = await self.get_album_tracks(item.key)
+ for album_track in album_tracks:
+ album_track.position = len(result) + 1
+ result.append(album_track)
+ return result
+
plex_playlist: PlexPlaylist = await self._get_data(prov_playlist_id, PlexPlaylist)
if not (playlist_items := await self._run_async(plex_playlist.items)):
return result