await mass.music.start_sync()
# get some data
- artist_count = await mass.music.artists.count()
- artist_count_lib = await mass.music.artists.count(True)
- print(f"Got {artist_count} artists ({artist_count_lib} in library)")
- album_count = await mass.music.albums.count()
- album_count_lib = await mass.music.albums.count(True)
- print(f"Got {album_count} albums ({album_count_lib} in library)")
- track_count = await mass.music.tracks.count()
- track_count_lib = await mass.music.tracks.count(True)
- print(f"Got {track_count} tracks ({track_count_lib} in library)")
- radio_count = await mass.music.radio.count(True)
- print(f"Got {radio_count} radio stations in library")
- playlists = await mass.music.playlists.db_items(True)
- print(f"Got {len(playlists)} playlists in library")
+ artists = await mass.music.artists.db_items()
+ artists_lib = await mass.music.artists.db_items(True)
+ print(
+ f"Got {artists_lib.total} artists in library (of {artists.total} total in db)"
+ )
+
+ albums = await mass.music.albums.db_items()
+ albums_lib = await mass.music.albums.db_items(True)
+ print(
+ f"Got {albums_lib.total} albums in library (of {albums.total} total in db)"
+ )
+
+ tracks = await mass.music.tracks.db_items()
+ tracks_lib = await mass.music.tracks.db_items(True)
+ print(
+ f"Got {tracks_lib.total} tracks in library (of {tracks.total} total in db)"
+ )
+
+ playlists = await mass.music.playlists.db_items()
+ playlists_lib = await mass.music.playlists.db_items(True)
+ print(
+ f"Got {playlists_lib.total} tracks in library (of {playlists.total} total in db)"
+ )
+
# register a player
test_player1 = TestPlayer("test1")
test_player2 = TestPlayer("test2")
# we can also send an uri, such as spotify://track/abcdfefgh
# or database://playlist/1
# or a list of items
- if len(playlists) > 0:
- await test_player1.active_queue.play_media(playlists[0])
+ if playlists.count > 0:
+ await test_player1.active_queue.play_media(playlists.items[0])
await asyncio.sleep(3600)
from __future__ import annotations
import asyncio
+from random import choice, random
from typing import List, Optional, Union
from music_assistant.constants import VARIOUS_ARTISTS
from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
from music_assistant.models.enums import EventType, MusicProviderFeature, ProviderType
-from music_assistant.models.errors import MediaNotFoundError
+from music_assistant.models.errors import (
+ MediaNotFoundError,
+ UnsupportedFeaturedException,
+)
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
)
return db_item
- async def _get_provider_album_tracks(
- self,
- item_id: str,
- provider: Optional[ProviderType] = None,
- provider_id: Optional[str] = None,
- ) -> List[Track]:
- """Return album tracks for the given provider album id."""
- prov = self.mass.music.get_provider(provider_id or provider)
- if not prov:
- return []
- full_album = await self.get_provider_item(item_id, provider_id or provider)
- # prefer cache items (if any)
- cache_key = f"{prov.type.value}.albumtracks.{item_id}"
- cache_checksum = full_album.metadata.checksum
- if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
- return [Track.from_dict(x) for x in cache]
- # no items in cache - get listing from provider
- items = []
- for track in await prov.get_album_tracks(item_id):
- # make sure that the (full) album is stored on the tracks
- track.album = full_album
- if full_album.metadata.images:
- track.metadata.images = full_album.metadata.images
- items.append(track)
- # store (serializable items) in cache
- self.mass.create_task(
- self.mass.cache.set(
- cache_key, [x.to_dict() for x in items], checksum=cache_checksum
- )
- )
- return items
-
- async def _get_db_album_tracks(
- self,
- item_id: str,
- ) -> List[Track]:
- """Return in-database album tracks for the given database album."""
- db_album = await self.get_db_item(item_id)
- # simply grab all tracks in the db that are linked to this album
- # TODO: adjust to json query instead of text search?
- query = f"SELECT * FROM tracks WHERE albums LIKE '%\"{item_id}\"%'"
- result = []
- for track in await self.mass.music.tracks.get_db_items_by_query(query):
- if album_mapping := next(
- (x for x in track.albums if x.item_id == db_album.item_id), None
- ):
- # make sure that the full album is set on the track and prefer the album's images
- track.album = db_album
- if db_album.metadata.images:
- track.metadata.images = db_album.metadata.images
- # apply the disc and track number from the mapping
- track.disc_number = album_mapping.disc_number
- track.track_number = album_mapping.track_number
- result.append(track)
- return sorted(result, key=lambda x: (x.disc_number or 0, x.track_number or 0))
-
async def add_db_item(self, item: Album, overwrite_existing: bool = False) -> Album:
"""Add a new record to the database."""
assert item.provider_ids, f"Album {item.name} is missing provider id(s)"
# delete the album itself from db
await super().delete_db_item(item_id)
+ async def _get_provider_album_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> List[Track]:
+ """Return album tracks for the given provider album id."""
+ prov = self.mass.music.get_provider(provider_id or provider)
+ if not prov:
+ return []
+ full_album = await self.get_provider_item(item_id, provider_id or provider)
+ # prefer cache items (if any)
+ cache_key = f"{prov.type.value}.albumtracks.{item_id}"
+ cache_checksum = full_album.metadata.checksum
+ if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
+ return [Track.from_dict(x) for x in cache]
+ # no items in cache - get listing from provider
+ items = []
+ for track in await prov.get_album_tracks(item_id):
+ # make sure that the (full) album is stored on the tracks
+ track.album = full_album
+ if full_album.metadata.images:
+ track.metadata.images = full_album.metadata.images
+ items.append(track)
+ # store (serializable items) in cache
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key, [x.to_dict() for x in items], checksum=cache_checksum
+ )
+ )
+ return items
+
+ async def _get_provider_dynamic_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ limit: int = 25,
+ ):
+ """Generate a dynamic list of tracks based on the album content."""
+ prov = self.mass.music.get_provider(provider_id or provider)
+ if (
+ not prov
+ or MusicProviderFeature.SIMILAR_TRACKS not in prov.supported_features
+ ):
+ return []
+ album_tracks = await self._get_provider_album_tracks(
+ item_id=item_id, provider=provider, provider_id=provider_id
+ )
+ # Grab a random track from the album that we use to obtain similar tracks for
+ track = choice(album_tracks)
+ # Calculate no of songs to grab from each list at a 10/90 ratio
+ total_no_of_tracks = limit + limit % 2
+ no_of_album_tracks = int(total_no_of_tracks * 10 / 100)
+ no_of_similar_tracks = int(total_no_of_tracks * 90 / 100)
+ # Grab similar tracks from the music provider
+ similar_tracks = await prov.get_similar_tracks(
+ prov_track_id=track.item_id, limit=no_of_similar_tracks
+ )
+ # Merge album content with similar tracks
+ dynamic_playlist = [
+ *sorted(album_tracks, key=lambda n: random())[:no_of_album_tracks],
+ *sorted(similar_tracks, key=lambda n: random())[:no_of_similar_tracks],
+ ]
+ return sorted(dynamic_playlist, key=lambda n: random())
+
+ async def _get_dynamic_tracks(self, media_item: Album, limit=25) -> List[Track]:
+ """Get dynamic list of tracks for given item, fallback/default implementation."""
+ # TODO: query metadata provider(s) to get similar tracks (or tracks from similar artists)
+ raise UnsupportedFeaturedException(
+ "No Music Provider found that supports requesting similar tracks."
+ )
+
+ async def _get_db_album_tracks(
+ self,
+ item_id: str,
+ ) -> List[Track]:
+ """Return in-database album tracks for the given database album."""
+ db_album = await self.get_db_item(item_id)
+ # simply grab all tracks in the db that are linked to this album
+ # TODO: adjust to json query instead of text search?
+ query = f"SELECT * FROM tracks WHERE albums LIKE '%\"{item_id}\"%'"
+ result = []
+ for track in await self.mass.music.tracks.get_db_items_by_query(query):
+ if album_mapping := next(
+ (x for x in track.albums if x.item_id == db_album.item_id), None
+ ):
+ # make sure that the full album is set on the track and prefer the album's images
+ track.album = db_album
+ if db_album.metadata.images:
+ track.metadata.images = db_album.metadata.images
+ # apply the disc and track number from the mapping
+ track.disc_number = album_mapping.disc_number
+ track.track_number = album_mapping.track_number
+ result.append(track)
+ return sorted(result, key=lambda x: (x.disc_number or 0, x.track_number or 0))
+
async def _match(self, db_album: Album) -> None:
"""
Try to find matching album on all providers for the provided (database) album.
import asyncio
import itertools
+from random import choice, random
from time import time
from typing import Any, Dict, List, Optional
from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_ARTISTS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
from music_assistant.models.enums import EventType, MusicProviderFeature, ProviderType
-from music_assistant.models.errors import MediaNotFoundError
+from music_assistant.models.errors import (
+ MediaNotFoundError,
+ UnsupportedFeaturedException,
+)
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
# delete the artist itself from db
await super().delete_db_item(item_id)
+ async def _get_provider_dynamic_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ limit: int = 25,
+ ):
+ """Generate a dynamic list of tracks based on the artist's top tracks."""
+ prov = self.mass.music.get_provider(provider_id or provider)
+ if (
+ not prov
+ or MusicProviderFeature.SIMILAR_TRACKS not in prov.supported_features
+ ):
+ return []
+ top_tracks = await self.get_provider_artist_toptracks(
+ item_id=item_id, provider=provider, provider_id=provider_id
+ )
+ # Grab a random track from the album that we use to obtain similar tracks for
+ track = choice(top_tracks)
+ # Calculate no of songs to grab from each list at a 10/90 ratio
+ total_no_of_tracks = limit + limit % 2
+ no_of_artist_tracks = int(total_no_of_tracks * 10 / 100)
+ no_of_similar_tracks = int(total_no_of_tracks * 90 / 100)
+ # Grab similar tracks from the music provider
+ similar_tracks = await prov.get_similar_tracks(
+ prov_track_id=track.item_id, limit=no_of_similar_tracks
+ )
+ # Merge album content with similar tracks
+ dynamic_playlist = [
+ *sorted(top_tracks, key=lambda n: random())[:no_of_artist_tracks],
+ *sorted(similar_tracks, key=lambda n: random())[:no_of_similar_tracks],
+ ]
+ return sorted(dynamic_playlist, key=lambda n: random())
+
+ async def _get_dynamic_tracks(
+ self, media_item: Artist, limit: int = 25
+ ) -> List[Track]:
+ """Get dynamic list of tracks for given item, fallback/default implementation."""
+ # TODO: query metadata provider(s) to get similar tracks (or tracks from similar artists)
+ raise UnsupportedFeaturedException(
+ "No Music Provider found that supports requesting similar tracks."
+ )
+
async def _match(self, db_artist: Artist, provider: MusicProvider) -> bool:
"""Try to find matching artists on given provider for the provided (database) artist."""
self.logger.debug(
from __future__ import annotations
from ctypes import Union
+from random import choice, random
from time import time
from typing import Any, List, Optional, Tuple
InvalidDataError,
MediaNotFoundError,
ProviderUnavailableError,
+ UnsupportedFeaturedException,
)
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(item_id, provider, provider_id)
prov = next(x for x in playlist.provider_ids)
- return await self.get_provider_playlist_tracks(
+ return await self._get_provider_playlist_tracks(
prov.item_id,
provider=prov.prov_type,
provider_id=prov.prov_id,
cache_checksum=playlist.metadata.checksum,
)
- async def get_provider_playlist_tracks(
- self,
- item_id: str,
- provider: Optional[ProviderType] = None,
- provider_id: Optional[str] = None,
- cache_checksum: Any = None,
- ) -> List[Track]:
- """Return album tracks for the given provider album id."""
- prov = self.mass.music.get_provider(provider_id or provider)
- if not prov:
- return []
- # prefer cache items (if any)
- cache_key = f"{prov.id}.playlist.{item_id}.tracks"
- if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
- return [Track.from_dict(x) for x in cache]
- # no items in cache - get listing from provider
- items = await prov.get_playlist_tracks(item_id)
- # double check if position set
- if items:
- assert (
- items[0].position is not None
- ), "Playlist items require position to be set"
- # store (serializable items) in cache
- self.mass.create_task(
- self.mass.cache.set(
- cache_key, [x.to_dict() for x in items], checksum=cache_checksum
- )
- )
- return items
-
async def add(self, item: Playlist) -> Playlist:
"""Add playlist to local db and return the new database item."""
item.metadata.last_refresh = int(time())
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
return await self.get_db_item(item_id)
+
+ async def _get_provider_playlist_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ cache_checksum: Any = None,
+ ) -> List[Track]:
+ """Return album tracks for the given provider album id."""
+ prov = self.mass.music.get_provider(provider_id or provider)
+ if not prov:
+ return []
+ # prefer cache items (if any)
+ cache_key = f"{prov.id}.playlist.{item_id}.tracks"
+ if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
+ return [Track.from_dict(x) for x in cache]
+ # no items in cache - get listing from provider
+ items = await prov.get_playlist_tracks(item_id)
+ # double check if position set
+ if items:
+ assert (
+ items[0].position is not None
+ ), "Playlist items require position to be set"
+ # store (serializable items) in cache
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key, [x.to_dict() for x in items], checksum=cache_checksum
+ )
+ )
+ return items
+
+ async def _get_provider_dynamic_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ limit: int = 25,
+ ):
+ """Generate a dynamic list of tracks based on the playlist content."""
+ prov = self.mass.music.get_provider(provider_id or provider)
+ if (
+ not prov
+ or MusicProviderFeature.SIMILAR_TRACKS not in prov.supported_features
+ ):
+ return []
+ playlist_tracks = await self._get_provider_playlist_tracks(
+ item_id=item_id, provider=provider, provider_id=provider_id
+ )
+ # Grab a random track from the playlist that we use to obtain similar tracks for
+ track = choice(playlist_tracks)
+ # Calculate no of songs to grab from each list at a 50/50 ratio
+ total_no_of_tracks = limit + limit % 2
+ tracks_per_list = int(total_no_of_tracks / 2)
+ # Grab similar tracks from the music provider
+ similar_tracks = await prov.get_similar_tracks(
+ prov_track_id=track.item_id, limit=tracks_per_list
+ )
+ # Merge playlist content with similar tracks
+ dynamic_playlist = [
+ *sorted(playlist_tracks, key=lambda n: random())[:tracks_per_list],
+ *sorted(similar_tracks, key=lambda n: random())[:tracks_per_list],
+ ]
+ return sorted(dynamic_playlist, key=lambda n: random())
+
+ async def _get_dynamic_tracks(
+ self, media_item: Playlist, limit: int = 25
+ ) -> List[Track]:
+ """Get dynamic list of tracks for given item, fallback/default implementation."""
+ # TODO: query metadata provider(s) to get similar tracks (or tracks from similar artists)
+ raise UnsupportedFeaturedException(
+ "No Music Provider found that supports requesting similar tracks."
+ )
from music_assistant.models.enums import EventType, MediaType, ProviderType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
-from music_assistant.models.media_items import Radio
+from music_assistant.models.media_items import Radio, Track
class RadioController(MediaControllerBase[Radio]):
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
return await self.get_db_item(item_id)
+
+ async def _get_provider_dynamic_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ limit: int = 25,
+ ) -> List[Track]:
+ """Generate a dynamic list of tracks based on the item's content."""
+ raise NotImplementedError("Dynamic tracks not supported for Radio MediaItem")
+
+ async def _get_dynamic_tracks(
+ self, media_item: Radio, limit: int = 25
+ ) -> List[Track]:
+ """Get dynamic list of tracks for given item, fallback/default implementation."""
+ raise NotImplementedError("Dynamic tracks not supported for Radio MediaItem")
MusicProviderFeature,
ProviderType,
)
-from music_assistant.models.errors import MediaNotFoundError
+from music_assistant.models.errors import (
+ MediaNotFoundError,
+ UnsupportedFeaturedException,
+)
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
provider.name,
)
+ async def _get_provider_dynamic_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ limit: int = 25,
+ ):
+ """Generate a dynamic list of tracks based on the track."""
+ prov = self.mass.music.get_provider(provider_id or provider)
+ if (
+ not prov
+ or MusicProviderFeature.SIMILAR_TRACKS not in prov.supported_features
+ ):
+ return []
+ # Grab similar tracks from the music provider
+ similar_tracks = await prov.get_similar_tracks(
+ prov_track_id=item_id, limit=limit
+ )
+ return similar_tracks
+
+ async def _get_dynamic_tracks(
+ self, media_item: Track, limit: int = 25
+ ) -> List[Track]:
+ """Get dynamic list of tracks for given item, fallback/default implementation."""
+ # TODO: query metadata provider(s) to get similar tracks (or tracks from similar artists)
+ raise UnsupportedFeaturedException(
+ "No Music Provider found that supports requesting similar tracks."
+ )
+
async def add_db_item(self, item: Track, overwrite_existing: bool = False) -> Track:
"""Add a new item record to the database."""
assert isinstance(item, Track), "Not a full Track object"
"""Async initialize of cache module."""
self.__schedule_cleanup_task()
- async def get(self, cache_key, checksum="", default=None):
+ async def get(self, cache_key: str, checksum: Optional[str] = None, default=None):
"""
Get object from cache and return the results.
cacheobject matches the checkum provided
"""
cur_time = int(time.time())
- if not isinstance(checksum, str):
+ if checksum is not None and not isinstance(checksum, str):
checksum = str(checksum)
# try memory cache first
REPLACE = "replace"
NEXT = "next"
ADD = "add"
+ RADIO = "radio"
class CrossFadeMode(Enum):
LIBRARY_TRACKS_EDIT = "library_tracks_edit"
LIBRARY_PLAYLISTS_EDIT = "library_playlists_edit"
LIBRARY_RADIOS_EDIT = "library_radios_edit"
+ # if we can grab 'similar tracks' from the music provider
+ # used to generate dynamic playlists
+ SIMILAR_TRACKS = "similar_tracks"
# playlist-specific features
PLAYLIST_TRACKS_EDIT = "playlist_tracks_edit"
PLAYLIST_CREATE = "playlist_create"
class QueueEmpty(MusicAssistantError):
"""Error raised when trying to start queue stream while queue is empty."""
+
+
+class UnsupportedFeaturedException(MusicAssistantError):
+ """Error raised when a feature is not supported."""
from music_assistant.models.event import MassEvent
from .enums import EventType, MediaType, MusicProviderFeature, ProviderType
-from .media_items import MediaItemType, PagedItems, media_from_dict
+from .media_items import MediaItemType, PagedItems, Track, media_from_dict
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
MassEvent(EventType.MEDIA_ITEM_DELETED, db_item.uri, db_item)
)
self.logger.debug("deleted item with id %s from database", item_id)
+
+ async def dynamic_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ limit: int = 25,
+ ) -> List[Track]:
+ """Return a dynamic list of tracks based on the given item."""
+ ref_item = await self.get(item_id, provider, provider_id)
+ for prov_id in ref_item.provider_ids:
+ prov = self.mass.music.get_provider(prov_id.prov_id)
+ if not prov.available:
+ continue
+ if MusicProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
+ continue
+ return await self._get_provider_dynamic_tracks(
+ item_id=prov_id.item_id,
+ provider=prov_id.prov_type,
+ provider_id=prov_id.prov_id,
+ limit=limit,
+ )
+ # Fallback to the default implementation
+ return await self._get_dynamic_tracks(ref_item)
+
+ @abstractmethod
+ async def _get_provider_dynamic_tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ limit: int = 25,
+ ) -> List[Track]:
+ """Generate a dynamic list of tracks based on the item's content."""
+
+ @abstractmethod
+ async def _get_dynamic_tracks(
+ self, media_item: ItemCls, limit: int = 25
+ ) -> List[Track]:
+ """Get dynamic list of tracks for given item, fallback/default implementation."""
"""Create a new playlist on provider with given name."""
raise NotImplementedError
+ async def get_similar_tracks(self, prov_track_id, limit=25) -> List[Track]:
+ """Retrieve a dynamic list of similar tracks based on the provided track."""
+ raise NotImplementedError
+
async def get_stream_details(self, item_id: str) -> StreamDetails | None:
"""Get streamdetails for a track/radio."""
raise NotImplementedError
self._last_player_update: int = 0
self._last_stream_id: str = ""
self._snapshot: Optional[QueueSnapShot] = None
+ self._radio_source: List[MediaItemType] = []
self.announcement_in_progress: bool = False
async def setup(self) -> None:
QueueOption.REPLACE -> Replace queue contents with these items
QueueOption.NEXT -> Play item(s) after current playing item
QueueOption.ADD -> Append new items at end of the queue
+ QueueOption.RADIO -> Fill the queue contents with dynamic content based on the item(s)
:param passive: if passive set to true the stream url will not be sent to the player.
"""
if self.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
+
# a single item or list of items may be provided
if not isinstance(media, list):
media = [media]
- queue_items = []
+
+ tracks: List[MediaItemType] = []
for item in media:
# parse provided uri into a MA MediaItem or Basic QueueItem from URL
if isinstance(item, str):
media_item = item
# collect tracks to play
- tracks = []
- if media_item.media_type == MediaType.ARTIST:
- tracks = await self.mass.music.artists.toptracks(
+ if queue_opt == QueueOption.RADIO:
+ # For dynamic/radio mode, the source items are stored and unpacked dynamically
+ tracks += [media_item]
+ elif media_item.media_type == MediaType.ARTIST:
+ tracks += await self.mass.music.artists.toptracks(
media_item.item_id, provider=media_item.provider
)
elif media_item.media_type == MediaType.ALBUM:
- tracks = await self.mass.music.albums.tracks(
+ tracks += await self.mass.music.albums.tracks(
media_item.item_id, provider=media_item.provider
)
elif media_item.media_type == MediaType.PLAYLIST:
- tracks = await self.mass.music.playlists.tracks(
+ tracks += await self.mass.music.playlists.tracks(
media_item.item_id, provider=media_item.provider
)
- elif media_item.media_type in (
- MediaType.RADIO,
- MediaType.TRACK,
- ):
- # single item
- tracks = [media_item]
+ else:
+ # single track or radio item
+ tracks += [media_item]
- # only add available items
- for track in tracks:
- if not track.available:
- continue
- queue_items.append(QueueItem.from_media_item(track))
+ # Handle Radio playback: clear queue and request first batch
+ if queue_opt == QueueOption.RADIO:
+ # clear existing items before we start radio
+ await self.clear()
+ # load the first batch
+ await self._load_radio_tracks(tracks)
+ if not passive:
+ await self.play_index(0)
+ return
+
+ # only add available items
+ queue_items = [QueueItem.from_media_item(x) for x in tracks if x.available]
# clear queue first if it was finished
if self._current_index and self._current_index >= (len(self._items) - 1):
self._current_index = None
self._items = []
- # load items into the queue, make sure we have valid values
- queue_items = [x for x in queue_items if isinstance(x, QueueItem)]
+ # if adding more than 50 items in play/next mode, treat as replace
+ if len(queue_items) > 50 and queue_opt in (QueueOption.PLAY, QueueOption.NEXT):
+ queue_opt = QueueOption.REPLACE
+
+ # load the items into the queue
if queue_opt == QueueOption.REPLACE:
await self.load(queue_items, passive)
- elif (
- queue_opt in [QueueOption.PLAY, QueueOption.NEXT] and len(queue_items) > 100
- ):
- await self.load(queue_items, passive)
elif queue_opt == QueueOption.NEXT:
await self.insert(queue_items, 1, passive)
elif queue_opt == QueueOption.PLAY:
elif queue_opt == QueueOption.ADD:
await self.append(queue_items)
+ async def _load_radio_tracks(
+ self, radio_items: Optional[List[MediaItemType]] = None
+ ) -> None:
+ """Fill the Queue with (additional) Radio tracks."""
+ if radio_items:
+ self._radio_source = radio_items
+ assert self._radio_source, "No Radio item(s) loaded/active!"
+
+ tracks: List[MediaItemType] = []
+ # grab dynamic tracks for (all) source items
+ # shuffle the source items, just in case
+ for radio_item in random.sample(self._radio_source, len(self._radio_source)):
+ ctrl = self.mass.music.get_controller(radio_item.media_type)
+ tracks += await ctrl.dynamic_tracks(
+ item_id=radio_item.item_id, provider=radio_item.provider
+ )
+ # make sure we do not grab too much items
+ if len(tracks) >= 50:
+ break
+ # fill queue - filter out unavailable items
+ queue_items = [QueueItem.from_media_item(x) for x in tracks if x.available]
+ await self.append(queue_items)
+
async def play_announcement(self, url: str, prepend_alert: bool = False) -> str:
"""
Play given uri as Announcement on the queue.
async def load(self, queue_items: List[QueueItem], passive: bool = False) -> None:
"""Load (overwrite) queue with new items."""
+ # reset radio source if a queue load is executed
+ self._radio_source = []
for index, item in enumerate(queue_items):
item.sort_index = index
if self.settings.shuffle_enabled and len(queue_items) > 5:
async def clear(self) -> None:
"""Clear all items in the queue."""
+ self._radio_source = []
if self.player.state not in (PlayerState.IDLE, PlayerState.OFF):
await self.stop()
await self.update_items([])
if self.player.active_queue != self or not self.active:
return
- new_index = self._current_index
track_time = self._current_item_elapsed_time
new_item_loaded = False
if self.player.state == PlayerState.PLAYING and self.player.elapsed_time > 0:
new_index, track_time = self.__get_queue_stream_index()
- # process new index
- if self._current_index != new_index:
- # queue track updated
- self._current_index = new_index
+
+ # process new index
+ if self._current_index != new_index:
+ # queue index updated
+ self._current_index = new_index
+ # watch dynamic radio items refill if needed
+ fill_index = len(self._items) - 5
+ if self._radio_source and (new_index >= fill_index):
+ self.mass.create_task(self._load_radio_tracks())
+
# check if a new track is loaded, wait for the streamdetails
if (
self.current_item
# being higher than the number of items to detect end of queue and/or handle repeat.
if cur_index is None:
return 0
- return cur_index + 1
+ next_index = cur_index + 1
+ return next_index
def signal_update(self, items_changed: bool = False) -> None:
"""Signal state changed of this queue."""
"""Export object to dict."""
cur_item = self.current_item.to_dict() if self.current_item else None
next_item = self.next_item.to_dict() if self.next_item else None
+
return {
"queue_id": self.queue_id,
"player": self.player.player_id,
"next_item": next_item,
"items": len(self._items),
"settings": self.settings.to_dict(),
+ "radio_source": [x.to_dict() for x in self._radio_source[:5]],
}
async def update_items(self, queue_items: List[QueueItem]) -> None:
MusicProviderFeature.SEARCH,
MusicProviderFeature.ARTIST_ALBUMS,
MusicProviderFeature.ARTIST_TOPTRACKS,
+ MusicProviderFeature.SIMILAR_TRACKS,
)
async def setup(self) -> bool:
f"playlists/{prov_playlist_id}/tracks", data=data
)
+ async def get_similar_tracks(self, prov_track_id, limit=25) -> List[Track]:
+ """Retrieve a dynamic list of tracks based on the provided item."""
+ endpoint = "recommendations"
+ items = await self._get_data(endpoint, seed_tracks=prov_track_id, limit=limit)
+ return [
+ await self._parse_track(item)
+ for item in items["tracks"]
+ if (item and item["id"])
+ ]
+
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
# make sure a valid track is requested.
ytm = ytmusicapi.YTMusic()
try:
artist = ytm.get_artist(channelId=prov_artist_id)
+ # ChannelId can sometimes be different and original ID is not part of the response
+ artist["channelId"] = prov_artist_id
except KeyError:
user = ytm.get_user(channelId=prov_artist_id)
artist = {"channelId": prov_artist_id, "name": user["name"]}
return await loop.run_in_executor(None, _add_playlist_tracks)
+async def get_song_radio_tracks(
+ headers: Dict[str, str], username: str, prov_item_id: str, limit=25
+) -> Dict[str, str]:
+ """Async wrapper around the ytmusicapi radio function."""
+ user = username if is_brand_account(username) else None
+
+ def _get_song_radio_tracks():
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ playlist_id = f"RDAMVM{prov_item_id}"
+ result = ytm.get_watch_playlist(
+ videoId=prov_item_id, playlistId=playlist_id, limit=limit
+ )
+ # Replace inconsistensies for easier parsing
+ for track in result["tracks"]:
+ if track.get("thumbnail"):
+ track["thumbnails"] = track["thumbnail"]
+ del track["thumbnail"]
+ if track.get("length"):
+ track["duration"] = get_sec(track["length"])
+ return result
+
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, _get_song_radio_tracks)
+
+
async def search(query: str, ytm_filter: str = None, limit: int = 20) -> List[Dict]:
"""Async wrapper around the ytmusicapi search function."""
def is_brand_account(username: str) -> bool:
"""Check if the provided username is a brand-account."""
return len(username) == 21 and username.isdigit()
+
+
+def get_sec(time_str):
+ """Get seconds from time."""
+ parts = time_str.split(":")
+ if len(parts) == 3:
+ return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
+ if len(parts) == 2:
+ return int(parts[0]) * 60 + int(parts[1])
+ return 0
get_library_playlists,
get_library_tracks,
get_playlist,
+ get_song_radio_tracks,
get_track,
library_add_remove_album,
library_add_remove_artist,
MusicProviderFeature.SEARCH,
MusicProviderFeature.ARTIST_ALBUMS,
MusicProviderFeature.ARTIST_TOPTRACKS,
+ MusicProviderFeature.SIMILAR_TRACKS,
)
async def setup(self) -> bool:
async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
"""Get album tracks for given album id."""
album_obj = await get_album(prov_album_id=prov_album_id)
- return [
- await self._parse_track(track)
- for track in album_obj["tracks"]
- if "tracks" in album_obj
- ]
+ if not album_obj.get("tracks"):
+ return []
+ tracks = []
+ for idx, track_obj in enumerate(album_obj["tracks"], 1):
+ track = await self._parse_track(track_obj=track_obj)
+ track.disc_number = 0
+ track.track_number = idx
+ tracks.append(track)
+ return tracks
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
return []
async def get_artist_toptracks(self, prov_artist_id) -> List[Track]:
- """Get a list of 5 most popular tracks for the given artist."""
+ """Get a list of 25 most popular tracks for the given artist."""
artist_obj = await get_artist(prov_artist_id=prov_artist_id)
- if "songs" in artist_obj and "results" in artist_obj["songs"]:
- return [
- await self.get_track(track["videoId"])
- for track in artist_obj["songs"]["results"]
- if track.get("videoId")
- ]
+ if artist_obj.get("songs") and artist_obj["songs"].get("browseId"):
+ prov_playlist_id = artist_obj["songs"]["browseId"]
+ playlist_tracks = await self.get_playlist_tracks(
+ prov_playlist_id=prov_playlist_id
+ )
+ return playlist_tracks[:25]
return []
async def library_add(self, prov_item_id, media_type: MediaType) -> None:
username=self.config.username,
)
+ async def get_similar_tracks(self, prov_track_id, limit=25) -> List[Track]:
+ """Retrieve a dynamic list of tracks based on the provided item."""
+ result = []
+ result = await get_song_radio_tracks(
+ headers=self._headers,
+ username=self.config.username,
+ prov_item_id=prov_track_id,
+ limit=limit,
+ )
+ if "tracks" in result:
+ tracks = []
+ for track in result["tracks"]:
+ # Playlist tracks sometimes do not have a valid artist id
+ # In that case, call the API for track details based on track id
+ try:
+ track = await self._parse_track(track)
+ if track:
+ tracks.append(track)
+ except InvalidDataError:
+ track = await self.get_track(track["videoId"])
+ if track:
+ tracks.append(track)
+ return tracks
+ return []
+
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
data = {
track.album = await self._parse_album(album, album["id"])
if "isExplicit" in track_obj:
track.metadata.explicit = track_obj["isExplicit"]
- if "duration" in track_obj and track_obj["duration"].isdigit():
- track.duration = track_obj["duration"]
+ if "duration" in track_obj and str(track_obj["duration"]).isdigit():
+ track.duration = int(track_obj["duration"])
elif (
"duration_seconds" in track_obj
and str(track_obj["duration_seconds"]).isdigit()
):
- track.duration = track_obj["duration_seconds"]
+ track.duration = int(track_obj["duration_seconds"])
available = True
if "isAvailable" in track_obj:
available = track_obj["isAvailable"]