Plex: Add support for importing collections as playlists (#2521)
authoranatosun <33899455+anatosun@users.noreply.github.com>
Tue, 21 Oct 2025 16:28:32 +0000 (18:28 +0200)
committerGitHub <noreply@github.com>
Tue, 21 Oct 2025 16:28:32 +0000 (18:28 +0200)
music_assistant/providers/plex/__init__.py

index c28de5d05e6be6dbef8904353d28d19d80de4e63..c7d44b4563ca6b36e6a767b9bc120f702b1aa845 100644 (file)
@@ -65,6 +65,7 @@ if TYPE_CHECKING:
     from collections.abc import AsyncGenerator, Callable, Coroutine
 
     from music_assistant_models.provider import ProviderManifest
+    from plexapi.library import LibraryMediaTag as PlexCollection
     from plexapi.library import MusicSection as PlexMusicSection
     from plexapi.media import AudioStream as PlexAudioStream
     from plexapi.media import Media as PlexMedia
@@ -85,6 +86,8 @@ CONF_LOCAL_SERVER_IP = "local_server_ip"
 CONF_LOCAL_SERVER_PORT = "local_server_port"
 CONF_LOCAL_SERVER_SSL = "local_server_ssl"
 CONF_LOCAL_SERVER_VERIFY_CERT = "local_server_verify_cert"
+CONF_IMPORT_COLLECTIONS = "import_collections"
+CONF_COLLECTION_PREFIX = "collection_prefix"
 
 FAKE_ARTIST_PREFIX = "_fake://"
 
@@ -319,6 +322,29 @@ async def get_config_entries(  # noqa: PLR0915
             )
         )
 
+    # Collection import options (advanced settings)
+    entries.append(
+        ConfigEntry(
+            key=CONF_IMPORT_COLLECTIONS,
+            type=ConfigEntryType.BOOLEAN,
+            label="Import Collections",
+            description="Import collections (tracks, albums, or artists) as playlists",
+            default_value=False,
+            category="advanced",
+        )
+    )
+    entries.append(
+        ConfigEntry(
+            key=CONF_COLLECTION_PREFIX,
+            type=ConfigEntryType.STRING,
+            label="Collection Prefix",
+            description="Prefix to add to collection names when imported as playlists",
+            default_value="Collection: ",
+            depends_on=CONF_IMPORT_COLLECTIONS,
+            category="advanced",
+        )
+    )
+
     # return all config entries
     return tuple(entries)
 
@@ -665,6 +691,40 @@ class PlexProvider(MusicProvider):
         playlist.is_editable = not plex_playlist.smart
         return playlist
 
+    async def _parse_collection(self, plex_collection: PlexCollection) -> Playlist:
+        """Parse a Plex Collection response to a Playlist object."""
+        # Get the configured collection prefix
+        collection_prefix = str(self.config.get_value(CONF_COLLECTION_PREFIX) or "")
+
+        # Collections are imported as playlists with the configured prefix
+        playlist = Playlist(
+            item_id=f"collection:{plex_collection.key}",
+            provider=self.lookup_key,
+            name=f"{collection_prefix}{plex_collection.title}",
+            provider_mappings={
+                ProviderMapping(
+                    item_id=f"collection:{plex_collection.key}",
+                    provider_domain=self.domain,
+                    provider_instance=self.instance_id,
+                )
+            },
+        )
+        # Add collection poster/thumbnail if available
+        if thumb := plex_collection.firstAttr("thumb", "composite"):
+            playlist.metadata.images = UniqueList(
+                [
+                    MediaItemImage(
+                        type=ImageType.THUMB,
+                        path=thumb,
+                        provider=self.lookup_key,
+                        remotely_accessible=False,
+                    )
+                ]
+            )
+        # Collections are not editable in Music Assistant
+        playlist.is_editable = False
+        return playlist
+
     async def _parse_track(self, plex_track: PlexTrack) -> Track:
         """Parse a Plex Track response to a Track model object."""
         if plex_track.media:
@@ -824,6 +884,12 @@ class PlexProvider(MusicProvider):
         for playlist in playlists_obj:
             yield await self._parse_playlist(playlist)
 
+        # Import collections as playlists if enabled
+        if self.config.get_value(CONF_IMPORT_COLLECTIONS):
+            collections_obj = await self._run_async(self._plex_library.collections)
+            for collection in collections_obj:
+                yield await self._parse_collection(collection)
+
     async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
         """Retrieve library tracks from Plex Music."""
         page_size = 500
@@ -893,6 +959,18 @@ class PlexProvider(MusicProvider):
     @use_cache(3600 * 3)  # Cache for 3 hours
     async def get_playlist(self, prov_playlist_id: str) -> Playlist:
         """Get full playlist details by id."""
+        # Check if this is a collection (collections have the format "collection:<key>")
+        if prov_playlist_id.startswith("collection:"):
+            # Extract the collection key
+            collection_key = prov_playlist_id.replace("collection:", "")
+            # Fetch the collection
+            if plex_collection := await self._run_async(
+                self._plex_library.fetchItem, collection_key
+            ):
+                return await self._parse_collection(plex_collection)
+            msg = f"Collection {prov_playlist_id} not found"
+            raise MediaNotFoundError(msg)
+
         if plex_playlist := await self._get_data(prov_playlist_id, PlexPlaylist):
             return await self._parse_playlist(plex_playlist)
         msg = f"Item {prov_playlist_id} not found"
@@ -905,6 +983,32 @@ class PlexProvider(MusicProvider):
         if page > 0:
             # paging not supported, we always return the whole list at once
             return []
+
+        # Check if this is a collection (collections have the format "collection:<key>")
+        if prov_playlist_id.startswith("collection:"):
+            # Extract the collection key
+            collection_key = prov_playlist_id.replace("collection:", "")
+            # Fetch the collection
+            plex_collection = await self._run_async(self._plex_library.fetchItem, collection_key)
+            if not plex_collection:
+                msg = f"Collection {prov_playlist_id} not found"
+                raise MediaNotFoundError(msg)
+            if not (collection_items := await self._run_async(plex_collection.items)):
+                return result
+            # Collections can contain tracks, albums, or artists - we only want tracks
+            for item in collection_items:
+                if item.type == "track":
+                    if track := await self._parse_track(item):
+                        track.position = len(result) + 1
+                        result.append(track)
+                elif item.type == "album":
+                    # If the collection contains albums, get all tracks from each album
+                    album_tracks = await self.get_album_tracks(item.key)
+                    for album_track in album_tracks:
+                        album_track.position = len(result) + 1
+                        result.append(album_track)
+            return result
+
         plex_playlist: PlexPlaylist = await self._get_data(prov_playlist_id, PlexPlaylist)
         if not (playlist_items := await self._run_async(plex_playlist.items)):
             return result