"""Manage MediaItems of type Playlist."""
from __future__ import annotations
+import random
from ctypes import Union
-from random import choice, random
from time import time
from typing import Any, List, Optional, Tuple
media_type = MediaType.PLAYLIST
item_cls = Playlist
- async def get_playlist_by_name(self, name: str) -> Playlist | None:
- """Get in-library playlist by name."""
- return await self.mass.database.get_row(self.db_table, {"name": name})
-
async def tracks(
self,
item_id: str,
playlist_tracks = await self._get_provider_playlist_tracks(
item_id=item_id, provider=provider, provider_id=provider_id
)
- # Grab a random track from the playlist that we use to obtain similar tracks for
- track = choice(playlist_tracks)
- # Calculate no of songs to grab from each list at a 50/50 ratio
- total_no_of_tracks = limit + limit % 2
- tracks_per_list = int(total_no_of_tracks / 2)
- # Grab similar tracks from the music provider
- similar_tracks = await prov.get_similar_tracks(
- prov_track_id=track.item_id, limit=tracks_per_list
- )
- # Merge playlist content with similar tracks
- dynamic_playlist = [
- *sorted(playlist_tracks, key=lambda n: random())[:tracks_per_list],
- *sorted(similar_tracks, key=lambda n: random())[:tracks_per_list],
- ]
- return sorted(dynamic_playlist, key=lambda n: random())
+ # filter out unavailable tracks
+ playlist_tracks = [x for x in playlist_tracks if x.available]
+ limit = min(limit, len(playlist_tracks))
+ # use set to prevent duplicates
+ final_items = set()
+ # to account for playlists with mixed content we grab suggestions from a few
+ # random playlist tracks to prevent getting too many tracks of one of the
+ # source playlist's genres.
+ while len(final_items) < limit:
+ # grab 5 random tracks from the playlist
+ base_tracks = random.sample(playlist_tracks, 5)
+ # add the source/base playlist tracks to the final list...
+ final_items.update(base_tracks)
+ # get 5 suggestions for one of the base tracks
+ base_track = next(x for x in base_tracks if x.available)
+ similar_tracks = await prov.get_similar_tracks(
+ prov_track_id=base_track.item_id, limit=5
+ )
+ final_items.update(x for x in similar_tracks if x.available)
+
+ # NOTE: In theory we can return a few more items than limit here
+ # Shuffle the final items list
+ return random.sample(final_items, len(final_items))
async def _get_dynamic_tracks(
self, media_item: Playlist, limit: int = 25