import contextlib
from collections.abc import Iterable
-from random import choice, random
from typing import TYPE_CHECKING, Any
from music_assistant.common.helpers.json import serialize_to_json
)
return items
- async def _get_provider_dynamic_tracks(
+ async def _get_provider_dynamic_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
- limit: int = 25,
):
- """Generate a dynamic list of tracks based on the album content."""
+ """Get the list of base tracks from the controller used to calculate the dynamic radio."""
assert provider_instance_id_or_domain != "library"
- prov = self.mass.get_provider(provider_instance_id_or_domain)
- if prov is None:
- return []
- if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
- return []
- album_tracks = await self._get_provider_album_tracks(
- item_id, provider_instance_id_or_domain
- )
- # Grab a random track from the album that we use to obtain similar tracks for
- track = choice(album_tracks)
- # Calculate no of songs to grab from each list at a 10/90 ratio
- total_no_of_tracks = limit + limit % 2
- no_of_album_tracks = int(total_no_of_tracks * 10 / 100)
- no_of_similar_tracks = int(total_no_of_tracks * 90 / 100)
- # Grab similar tracks from the music provider
- similar_tracks = await prov.get_similar_tracks(
- prov_track_id=track.item_id, limit=no_of_similar_tracks
- )
- # Merge album content with similar tracks
- # ruff: noqa: ARG005
- dynamic_playlist = [
- *sorted(album_tracks, key=lambda n: random())[:no_of_album_tracks],
- *sorted(similar_tracks, key=lambda n: random())[:no_of_similar_tracks],
- ]
- return sorted(dynamic_playlist, key=lambda n: random())
+ return await self._get_provider_album_tracks(item_id, provider_instance_id_or_domain)
async def _get_dynamic_tracks(
self,
import asyncio
import contextlib
-from random import choice, random
from typing import TYPE_CHECKING, Any
from music_assistant.common.helpers.json import serialize_to_json
await self._set_provider_mappings(db_id, provider_mappings, overwrite)
self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
- async def _get_provider_dynamic_tracks(
+ async def _get_provider_dynamic_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
- limit: int = 25,
):
- """Generate a dynamic list of tracks based on the artist's top tracks."""
+ """Get the list of base tracks from the controller used to calculate the dynamic radio."""
assert provider_instance_id_or_domain != "library"
- prov = self.mass.get_provider(provider_instance_id_or_domain)
- if prov is None:
- return []
- if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
- return []
- top_tracks = await self.get_provider_artist_toptracks(
+ return await self.get_provider_artist_toptracks(
item_id,
provider_instance_id_or_domain,
)
- # Grab a random track from the album that we use to obtain similar tracks for
- track = choice(top_tracks)
- # Calculate no of songs to grab from each list at a 10/90 ratio
- total_no_of_tracks = limit + limit % 2
- no_of_artist_tracks = int(total_no_of_tracks * 10 / 100)
- no_of_similar_tracks = int(total_no_of_tracks * 90 / 100)
- # Grab similar tracks from the music provider
- similar_tracks = await prov.get_similar_tracks(
- prov_track_id=track.item_id, limit=no_of_similar_tracks
- )
- # Merge album content with similar tracks
- dynamic_playlist = [
- *sorted(top_tracks, key=lambda _: random())[:no_of_artist_tracks],
- *sorted(similar_tracks, key=lambda _: random())[:no_of_similar_tracks],
- ]
- return sorted(dynamic_playlist, key=lambda n: random()) # noqa: ARG005
async def _get_dynamic_tracks(
self,
with suppress(AssertionError):
await self.remove_item_from_library(db_id)
- async def dynamic_tracks(
+ async def dynamic_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
- limit: int = 25,
) -> list[Track]:
- """Return a dynamic list of tracks based on the given item."""
+ """Return a list of base tracks to calculate a list of dynamic tracks."""
ref_item = await self.get(item_id, provider_instance_id_or_domain)
for prov_mapping in ref_item.provider_mappings:
prov = self.mass.get_provider(prov_mapping.provider_instance)
continue
if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
continue
- return await self._get_provider_dynamic_tracks(
+ return await self._get_provider_dynamic_base_tracks(
prov_mapping.item_id,
prov_mapping.provider_instance,
- limit=limit,
)
# Fallback to the default implementation
return await self._get_dynamic_tracks(ref_item)
"""
@abstractmethod
- async def _get_provider_dynamic_tracks(
+ async def _get_provider_dynamic_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
- limit: int = 25,
) -> list[Track]:
- """Generate a dynamic list of tracks based on the item's content."""
+ """Get the list of base tracks from the controller used to calculate the dynamic radio."""
@abstractmethod
async def _get_dynamic_tracks(self, media_item: ItemCls, limit: int = 25) -> list[Track]:
)
return items
- async def _get_provider_dynamic_tracks(
+ async def _get_provider_dynamic_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
- limit: int = 25,
):
- """Generate a dynamic list of tracks based on the playlist content."""
+ """Get the list of base tracks from the controller used to calculate the dynamic radio."""
assert provider_instance_id_or_domain != "library"
- provider = self.mass.get_provider(provider_instance_id_or_domain)
- if not provider or ProviderFeature.SIMILAR_TRACKS not in provider.supported_features:
- return []
playlist = await self.get(item_id, provider_instance_id_or_domain)
- playlist_tracks = [
+ return [
x
async for x in self.tracks(playlist.item_id, playlist.provider)
# filter out unavailable tracks
if x.available
]
- limit = min(limit, len(playlist_tracks))
- # use set to prevent duplicates
- final_items: list[Track] = []
- # to account for playlists with mixed content we grab suggestions from a few
- # random playlist tracks to prevent getting too many tracks of one of the
- # source playlist's genres.
- sample_size = min(len(playlist_tracks), 5)
- while len(final_items) < limit:
- # grab 5 random tracks from the playlist
- base_tracks = random.sample(playlist_tracks, sample_size)
- # add the source/base playlist tracks to the final list...
- final_items.extend(base_tracks)
- # get 5 suggestions for one of the base tracks
- base_track = next(x for x in base_tracks if x.available)
- similar_tracks = await provider.get_similar_tracks(
- prov_track_id=base_track.item_id, limit=5
- )
- final_items.extend(x for x in similar_tracks if x.available)
- # Remove duplicate tracks
- radio_items = {track.sort_name: track for track in final_items}.values()
- # NOTE: In theory we can return a few more items than limit here
- # Shuffle the final items list
- return random.sample(list(radio_items), len(radio_items))
async def _get_dynamic_tracks(
self,
await self._set_provider_mappings(db_id, provider_mappings, overwrite)
self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
- async def _get_provider_dynamic_tracks(
+ async def _get_provider_dynamic_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
limit: int = 25,
) -> list[Track]:
- """Generate a dynamic list of tracks based on the item's content."""
+ """Get the list of base tracks from the controller used to calculate the dynamic radio."""
msg = "Dynamic tracks not supported for Radio MediaItem"
raise NotImplementedError(msg)
)
return matches
- async def _get_provider_dynamic_tracks(
+ async def get_provider_similar_tracks(
+ self, item_id: str, provider_instance_id_or_domain: str, limit: int = 25
+ ):
+ """Get a list of similar tracks from the provider, based on the track."""
+ ref_item = await self.get(item_id, provider_instance_id_or_domain)
+ for prov_mapping in ref_item.provider_mappings:
+ prov = self.mass.get_provider(prov_mapping.provider_instance)
+ if prov is None:
+ continue
+ if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
+ continue
+ # Grab similar tracks from the music provider
+ return await prov.get_similar_tracks(prov_track_id=prov_mapping.item_id, limit=limit)
+ return []
+
+ async def _get_provider_dynamic_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
- limit: int = 25,
):
- """Generate a dynamic list of tracks based on the track."""
+ """Get the list of base tracks from the controller used to calculate the dynamic radio."""
assert provider_instance_id_or_domain != "library"
- prov = self.mass.get_provider(provider_instance_id_or_domain)
- if prov is None:
- return []
- if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
- return []
- # Grab similar tracks from the music provider
- return await prov.get_similar_tracks(prov_track_id=item_id, limit=limit)
+ return [await self.get(item_id, provider_instance_id_or_domain)]
async def _get_dynamic_tracks(
self,
)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.audio import get_stream_details
+from music_assistant.server.helpers.throttle_retry import BYPASS_THROTTLER
from music_assistant.server.models.core_controller import CoreController
if TYPE_CHECKING:
- start_item: Optional item to start the playlist or album from.
"""
# ruff: noqa: PLR0915,PLR0912
+ # we use a contextvar to bypass the throttler for this asyncio task/context
+ # this makes sure that playback has priority over other requests that may be
+ # happening in the background
+ BYPASS_THROTTLER.set(True)
queue = self._queues[queue_id]
# always fetch the underlying player so we can raise early if its not available
queue_player = self.mass.players.get(queue_id, True)
queue.radio_source += radio_source
# Use collected media items to calculate the radio if radio mode is on
if radio_mode:
- tracks = await self._get_radio_tracks(queue_id)
+ tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=True)
# only add valid/available items
queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x and x.available]
if getattr(self, debounce_key, None):
return
setattr(self, debounce_key, True)
- tracks = await self._get_radio_tracks(queue_id)
+ tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
# fill queue - filter out unavailable items
queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
self.load(
media=self.player_media_from_queue_item(next_item, queue.flow_mode),
)
- async def _get_radio_tracks(self, queue_id: str) -> list[MediaItemType]:
+ async def _get_radio_tracks(
+ self, queue_id: str, is_initial_radio_mode: bool = False
+ ) -> list[Track]:
"""Call the registered music providers for dynamic tracks."""
queue = self._queues[queue_id]
assert queue.radio_source, "No Radio item(s) loaded/active!"
- tracks: list[MediaItemType] = []
- # grab dynamic tracks for (all) source items
+ available_base_tracks: list[Track] = []
+ base_track_sample_size = 5
+ # Grab all the available base tracks based on the selected source items.
# shuffle the source items, just in case
for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
ctrl = self.mass.music.get_controller(radio_item.media_type)
- tracks += await ctrl.dynamic_tracks(radio_item.item_id, radio_item.provider)
- # make sure we do not grab too much items
- if len(tracks) >= 50:
+ available_base_tracks += [
+ track
+ for track in await ctrl.dynamic_base_tracks(radio_item.item_id, radio_item.provider)
+ # Avoid duplicate base tracks
+ if track not in available_base_tracks
+ ]
+ # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
+ base_tracks = random.sample(
+ available_base_tracks, min(base_track_sample_size, len(available_base_tracks))
+ )
+ # Use a set to avoid duplicate dynamic tracks
+ dynamic_tracks: set[Track] = set()
+ track_ctrl = self.mass.music.get_controller(MediaType.TRACK)
+ # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
+ for base_track in base_tracks:
+ [
+ dynamic_tracks.add(track)
+ for track in await track_ctrl.get_provider_similar_tracks(
+ base_track.item_id, base_track.provider
+ )
+ if track not in base_tracks
+ ]
+ if len(dynamic_tracks) >= 50:
break
- return tracks
+ queue_tracks: list[Track] = []
+ dynamic_tracks = list(dynamic_tracks)
+ # Only include the sampled base tracks when the radio mode is first initialized
+ if is_initial_radio_mode:
+ queue_tracks += [base_tracks[0]]
+ # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
+ if len(base_tracks) > 1:
+ for base_track in base_tracks[1:]:
+ queue_tracks += [base_track]
+ queue_tracks += random.sample(dynamic_tracks, 2)
+ # Add dynamic tracks to the queue, make sure to exclude already picked tracks
+ remaining_dynamic_tracks = [t for t in dynamic_tracks if t not in queue_tracks]
+ queue_tracks += random.sample(
+ remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
+ )
+ return queue_tracks
async def get_artist_tracks(self, artist: Artist) -> list[Track]:
"""Return tracks for given artist, based on user preference."""