MediaItemImage,
MediaItemMetadata,
MediaItemType,
- PagedItems,
Playlist,
PlaylistTrack,
Radio,
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
- ) -> PagedItems[Track]:
+ ) -> list[Track]:
"""Get Track listing from the server."""
- return PagedItems.parse(
- await self.client.send_command(
+ return [
+ Track.from_dict(obj)
+ for obj in await self.client.send_command(
"music/tracks/library_items",
favorite=favorite,
search=search,
limit=limit,
offset=offset,
order_by=order_by,
- ),
- Track,
- )
+ )
+ ]
async def get_track(
self,
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
- ) -> PagedItems[Album]:
+ ) -> list[Album]:
"""Get Albums listing from the server."""
- return PagedItems.parse(
- await self.client.send_command(
+ return [
+ Album.from_dict(obj)
+ for obj in await self.client.send_command(
"music/albums/library_items",
favorite=favorite,
search=search,
limit=limit,
offset=offset,
order_by=order_by,
- ),
- Album,
- )
+ )
+ ]
async def get_album(
self,
offset: int | None = None,
order_by: str | None = None,
album_artists_only: bool = False,
- ) -> PagedItems[Artist]:
+ ) -> list[Artist]:
"""Get Artists listing from the server."""
- return PagedItems.parse(
- await self.client.send_command(
+ return [
+ Artist.from_dict(obj)
+ for obj in await self.client.send_command(
"music/artists/library_items",
favorite=favorite,
search=search,
offset=offset,
order_by=order_by,
album_artists_only=album_artists_only,
- ),
- Artist,
- )
+ )
+ ]
async def get_artist(
self,
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
- ) -> PagedItems[Playlist]:
+ ) -> list[Playlist]:
"""Get Playlists listing from the server."""
- return PagedItems.parse(
- await self.client.send_command(
+ return [
+ Playlist.from_dict(obj)
+ for obj in await self.client.send_command(
"music/playlists/library_items",
favorite=favorite,
search=search,
limit=limit,
offset=offset,
order_by=order_by,
- ),
- Playlist,
- )
+ )
+ ]
async def get_playlist(
self,
provider_instance_id_or_domain: str,
limit: int | None = None,
offset: int | None = None,
- ) -> PagedItems[PlaylistTrack]:
+ ) -> list[PlaylistTrack]:
"""Get tracks for given playlist."""
- return PagedItems.parse(
- await self.client.send_command(
+ return [
+ PlaylistTrack.from_dict(obj)
+ for obj in await self.client.send_command(
"music/playlists/playlist_tracks",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
limit=limit,
offset=offset,
- ),
- PlaylistTrack,
- )
+ )
+ ]
async def add_playlist_tracks(self, db_playlist_id: str | int, uris: list[str]) -> None:
"""Add multiple tracks to playlist. Creates background tasks to process the action."""
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
- ) -> PagedItems[Radio]:
+ ) -> list[Radio]:
"""Get Radio listing from the server."""
- return PagedItems.parse(
- await self.client.send_command(
+ return [
+ Radio.from_dict(obj)
+ for obj in await self.client.send_command(
"music/radios/library_items",
favorite=favorite,
search=search,
limit=limit,
offset=offset,
order_by=order_by,
- ),
- Radio,
- )
+ )
+ ]
async def get_radio(
self,
path: str | None = None,
limit: int | None = None,
offset: int | None = None,
- ) -> PagedItems[MediaItemType]:
+ ) -> list[MediaItemType]:
"""Browse Music providers."""
- return PagedItems.parse(
- await self.client.send_command(
+ return [
+ media_from_dict(obj)
+ for obj in await self.client.send_command(
"music/browse",
path=path,
limit=limit,
offset=offset,
- ),
- MediaItemType,
- )
+ )
+ ]
async def recently_played(
self, limit: int = 10, media_types: list[MediaType] | None = None
from typing import TYPE_CHECKING
from music_assistant.common.models.enums import EventType, QueueOption, RepeatMode
-from music_assistant.common.models.media_items import PagedItems
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
async def get_player_queue_items(
self, queue_id: str, limit: int = 500, offset: int = 0
- ) -> PagedItems[QueueItem]:
+ ) -> list[QueueItem]:
"""Get all QueueItems for given PlayerQueue."""
- return PagedItems.parse(
- await self.client.send_command(
+ return [
+ QueueItem.from_dict(obj)
+ for obj in await self.client.send_command(
"player_queues/items", queue_id=queue_id, limit=limit, offset=offset
)
- )
+ ]
async def get_active_queue(self, player_id: str) -> PlayerQueue:
"""Return the current active/synced queue for a player."""
from collections.abc import Iterable
from dataclasses import dataclass, field, fields
-from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Self, TypeVar, cast
from mashumaro import DataClassDictMixin
)
-class PagedItems(Generic[_T]):
- """Model for a paged listing."""
-
- def __init__(
- self,
- items: list[_T],
- limit: int,
- offset: int,
- total: int | None,
- count: int | None = None,
- ):
- """Initialize PagedItems."""
- self.items = items
- self.count = count = count or len(items)
- self.limit = limit
- self.offset = offset
- self.total = total
-
- def to_dict(self, *args, **kwargs) -> dict[str, Any]:
- """Return PagedItems as serializable dict."""
- return {
- "items": [x.to_dict() for x in self.items],
- "count": self.count,
- "limit": self.limit,
- "offset": self.offset,
- "total": self.total,
- }
-
- @classmethod
- def parse(cls, raw: dict[str, Any], item_type: type[MediaItemType]) -> Self[MediaItemType]:
- """Parse PagedItems object including correct item type."""
- return PagedItems(
- items=[item_type.from_dict(x) for x in raw["items"]],
- count=raw["count"],
- limit=raw["limit"],
- offset=raw["offset"],
- total=raw["total"],
- )
-
-
@dataclass(kw_only=True)
class SearchResults(DataClassDictMixin):
"""Model for results from a search query."""
Artist,
ItemMapping,
MediaType,
- PagedItems,
Track,
UniqueList,
)
self.mass.register_api_command(f"music/{api_base}/artist_albums", self.albums)
self.mass.register_api_command(f"music/{api_base}/artist_tracks", self.tracks)
+ async def library_count(
+ self, favorite_only: bool = False, album_artists_only: bool = False
+ ) -> int:
+ """Return the total number of items in the library."""
+ sql_query = self.base_query
+ if favorite_only:
+ sql_query += f" WHERE {self.db_table}.favorite = 1"
+ if album_artists_only:
+ sql_query += " WHERE " if "WHERE" not in sql_query else " AND "
+ sql_query += (
+ f"artists.item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
+ f"from {DB_TABLE_ALBUM_ARTISTS})"
+ )
+ return await self.mass.music.database.get_count_from_query(sql_query)
+
async def library_items(
self,
favorite: bool | None = None,
extra_query: str | None = None,
extra_query_params: dict[str, Any] | None = None,
album_artists_only: bool = False,
- ) -> PagedItems:
+ ) -> list[Artist]:
"""Get in-database (album) artists."""
if album_artists_only:
artist_query = (
Album,
ItemMapping,
MediaItemType,
- PagedItems,
ProviderMapping,
Track,
media_from_dict,
self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
# register (base) api handlers
self.api_base = api_base = f"{self.media_type}s"
+ self.mass.register_api_command(f"music/{api_base}/count", self.library_count)
self.mass.register_api_command(f"music/{api_base}/library_items", self.library_items)
self.mass.register_api_command(f"music/{api_base}/get", self.get)
self.mass.register_api_command(f"music/{api_base}/get_{self.media_type}", self.get)
self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, library_item.uri, library_item)
self.logger.debug("deleted item with id %s from database", db_id)
+ async def library_count(self, favorite_only: bool = False) -> int:
+ """Return the total number of items in the library."""
+ sql_query = self.base_query
+ if favorite_only:
+ sql_query += f" WHERE {self.db_table}.favorite = 1"
+ return await self.mass.music.database.get_count_from_query(sql_query)
+
async def library_items(
self,
favorite: bool | None = None,
order_by: str = "sort_name",
extra_query: str | None = None,
extra_query_params: dict[str, Any] | None = None,
- ) -> PagedItems[ItemCls]:
+ ) -> list[ItemCls]:
"""Get in-database items."""
- items = await self._get_library_items_by_query(
+ return await self._get_library_items_by_query(
favorite=favorite,
search=search,
limit=limit,
extra_query=extra_query,
extra_query_params=extra_query_params,
)
- count = len(items)
- if 0 < count < limit:
- total = offset + count
- else:
- total = await self._get_library_items_by_query(
- favorite=favorite,
- search=search,
- limit=limit,
- offset=offset,
- order_by=order_by,
- extra_query=extra_query,
- extra_query_params=extra_query_params,
- count_only=True,
- )
- return PagedItems(items=items, limit=limit, offset=offset, count=count, total=total)
async def iter_library_items(
self,
order_by: str | None = None,
extra_query: str | None = None,
extra_query_params: dict[str, Any] | None = None,
- count_only: bool = False,
) -> list[ItemCls] | int:
"""Fetch MediaItem records from database given a custom (WHERE) clause."""
sql_query = self.base_query
if query_parts:
sql_query += " WHERE " + " AND ".join(query_parts)
# build final query
- if count_only:
- return await self.mass.music.database.get_count_from_query(sql_query, query_params)
if order_by:
if sort_key := SORT_KEYS.get(order_by):
sql_query += f" ORDER BY {sort_key}"
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import PagedItems, Playlist, PlaylistTrack, Track
+from music_assistant.common.models.media_items import Playlist, PlaylistTrack, Track
from music_assistant.constants import DB_TABLE_PLAYLISTS
from music_assistant.server.models.music_provider import MusicProvider
offset: int = 0,
limit: int = 50,
prefer_library_items: bool = True,
- ) -> PagedItems[PlaylistTrack]:
+ ) -> list[PlaylistTrack]:
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(
item_id,
final_tracks.append(track)
else:
final_tracks = tracks
- # We set total to None as we have no idea how many tracks there are.
- # The frontend can figure this out and stop paging when it gets an empty list.
- # Exception is when we receive a result that is either much higher
- # or smaller than the limit - in that case we consider the list final.
- total = None
- count = len(final_tracks)
- if count and (count < (limit - 10) or count > (limit + 10)):
- total = offset + len(final_tracks)
- return PagedItems(items=final_tracks, limit=limit, offset=offset, total=total, count=count)
+ return final_tracks
async def create_playlist(
self, name: str, provider_instance_or_domain: str | None = None
limit=limit,
prefer_library_items=prefer_library_items,
)
- result += paged_items.items
- if paged_items.total is not None and len(result) >= paged_items.total:
+ result += paged_items
+ if len(paged_items) > limit:
+ # this happens if the provider doesn't support paging
+ # and it does simply return all items in one call
+ break
+ if len(paged_items) == 0:
break
- if paged_items.count == 0:
+ if len(paged_items) < (limit - 20):
+ # if get get less than 30 items, we assume this is the end
+ # note that we account for the fact that the provider might
+ # return less than the limit (e.g. 20 items) due to track unavailability
break
- if paged_items.total is None and paged_items.items == result:
+ if paged_items == result:
# safety guard for malfunctioning provider
break
- offset += paged_items.count
+ offset += limit
return result
async def _add_library_item(self, item: Playlist) -> int:
# retrieve metedata for the playlist from the tracks (such as genres etc.)
# TODO: retrieve style/mood ?
playlist_items = await self.mass.music.playlists.tracks(playlist.item_id, playlist.provider)
- for track in playlist_items.items:
+ for track in playlist_items:
if track.image:
all_playlist_tracks_images.add(track.image)
if track.metadata.genres:
MusicAssistantError,
ProviderUnavailableError,
)
-from music_assistant.common.models.media_items import (
- BrowseFolder,
- MediaItemType,
- PagedItems,
- SearchResults,
-)
+from music_assistant.common.models.media_items import BrowseFolder, MediaItemType, SearchResults
from music_assistant.common.models.provider import SyncTask
from music_assistant.common.models.streamdetails import LoudnessMeasurement
from music_assistant.constants import (
return result
@api_command("music/browse")
- async def browse(
- self, offset: int, limit: int, path: str | None = None
- ) -> PagedItems[MediaItemType]:
+ async def browse(self, offset: int, limit: int, path: str | None = None) -> list[MediaItemType]:
"""Browse Music providers."""
if not path or path == "root":
# root level; folder per provider
name=prov.name,
)
)
- return PagedItems(items=root_items, limit=limit, offset=offset, total=len(root_items))
+ return root_items
# provider level
prepend_items: list[MediaItemType] = []
BrowseFolder(item_id="root", provider="library", path="root", name="..")
)
if not prov:
- return PagedItems(
- items=prepend_items, limit=limit, offset=offset, total=len(prepend_items)
- )
+ return prepend_items
elif offset == 0:
back_path = f"{provider_instance}://" + "/".join(sub_path.split("/")[:-1])
prepend_items.append(
)
# limit -1 to account for the prepended items
prov_items = await prov.browse(path, offset=offset, limit=limit)
- prov_items.items = prepend_items + prov_items.items
- if prov_items.total is not None:
- prov_items.total += len(prepend_items)
- return prov_items
+ return prepend_items + prov_items
@api_command("music/recently_played_items")
async def recently_played(
PlayerUnavailableError,
QueueEmpty,
)
-from music_assistant.common.models.media_items import (
- AlbumTrack,
- MediaItemType,
- PagedItems,
- media_from_dict,
-)
+from music_assistant.common.models.media_items import AlbumTrack, MediaItemType, media_from_dict
from music_assistant.common.models.player import PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
return self._queues.get(queue_id)
@api_command("player_queues/items")
- def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> PagedItems[QueueItem]:
+ def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
"""Return all QueueItems for given PlayerQueue."""
if queue_id not in self._queue_items:
- return PagedItems(items=[], limit=limit, offset=offset, total=0)
+ return []
- return PagedItems(
- items=self._queue_items[queue_id][offset : offset + limit],
- limit=limit,
- offset=offset,
- total=len(self._queue_items[queue_id]),
- )
+ return self._queue_items[queue_id][offset : offset + limit]
@api_command("player_queues/get_active_queue")
def get_active_queue(self, player_id: str) -> PlayerQueue:
return
if queue := self.get(queue_id):
queue.stream_finished = None
- # simply forward the command to underlying player
- await self.mass.players.cmd_stop(queue_id)
+ # forward the actual stop command to the player provider
+ if player_provider := self.mass.players.get_player_provider(queue_id):
+ await player_provider.cmd_stop(queue_id)
@api_command("player_queues/play")
async def play(self, queue_id: str) -> None:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
if self._queues[queue_id].state == PlayerState.PAUSED:
- # simply forward the command to underlying player
- await self.mass.players.cmd_play(queue_id)
+ # forward the actual stop command to the player provider
+ if player_provider := self.mass.players.get_player_provider(queue_id):
+ await player_provider.cmd_play(queue_id)
else:
await self.resume(queue_id)
# if player does not support pause, we need to send stop
await self.stop(queue_id)
return
- # simply forward the command to underlying player
- await self.mass.players.cmd_pause(queue_id)
+ # forward the actual stop command to the player provider
+ if player_provider := self.mass.players.get_player_provider(queue_id):
+ await player_provider.cmd_pause(queue_id)
@api_command("player_queues/play_pause")
async def play_pause(self, queue_id: str) -> None:
queue.current_index = index
queue.index_in_buffer = index
queue.flow_mode_start_index = index
- player_needs_flow_mode = self.mass.config.get_raw_player_config_value(
- queue_id, CONF_FLOW_MODE, False
+ player_needs_flow_mode = await self.mass.config.get_player_config_value(
+ queue_id, CONF_FLOW_MODE
)
next_index = self._get_next_index(queue_id, index, allow_repeat=False)
queue.flow_mode = player_needs_flow_mode and next_index is not None
# player does not support enqueue next feature.
# we wait for the player to stop after it reaches the end of the track
- if queue.stream_finished and queue.state == PlayerState.IDLE:
+ if (
+ (not queue.flow_mode or queue.repeat_mode == RepeatMode.ALL)
+ and queue.stream_finished
+ and queue.state == PlayerState.IDLE
+ ):
queue.stream_finished = None
self.mass.create_task(_enqueue_next(queue.current_index, False))
return
Artist,
BrowseFolder,
MediaItemType,
- PagedItems,
Playlist,
Radio,
SearchResults,
return await self.get_radio(prov_item_id)
return await self.get_track(prov_item_id)
- async def browse(self, path: str, offset: int, limit: int) -> PagedItems[MediaItemType]:
+ async def browse(self, path: str, offset: int, limit: int) -> list[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provider_id://artists).
items.append(item)
if len(items) >= limit:
break
- # explicitly set total to None as we don't know the total count
- total = None
else:
# no subpath: return main listing
if ProviderFeature.LIBRARY_ARTISTS in self.supported_features:
label="radios",
)
)
- total = len(items)
- return PagedItems(items=items, limit=limit, offset=offset, total=total)
+ return items
async def recommendations(self) -> list[MediaItemType]:
"""Get this provider's recommendations.
) -> list[Track]:
"""Get playlist tracks."""
if prov_playlist_id in BUILTIN_PLAYLISTS:
- result = await self._get_builtin_playlist_tracks(prov_playlist_id)
- return result[offset : offset + limit]
+ if offset:
+ # paging not supported, we always return the whole list at once
+ return []
+ return await self._get_builtin_playlist_tracks(prov_playlist_id)
# user created universal playlist
result: list[Track] = []
playlist_items = await self._read_playlist_file_items(prov_playlist_id, offset, limit)
res = await self.mass.music.tracks.library_items(
favorite=True, limit=2500, order_by="random_play_count"
)
- for idx, item in enumerate(res.items, 1):
+ for idx, item in enumerate(res, 1):
item.position = idx
result.append(item)
return result
res = await self.mass.music.tracks.library_items(
limit=500, order_by="random_play_count"
)
- for idx, item in enumerate(res.items, 1):
+ for idx, item in enumerate(res, 1):
item.position = idx
result.append(item)
return result
if builtin_playlist_id == RANDOM_ALBUM:
- for random_album in (
- await self.mass.music.albums.library_items(limit=1, order_by="random")
- ).items:
+ for random_album in await self.mass.music.albums.library_items(
+ limit=1, order_by="random"
+ ):
# use the function specified in the queue controller as that
# already handles unwrapping an album by user preference
tracks = await self.mass.music.albums.tracks(
result.append(track)
return result
if builtin_playlist_id == RANDOM_ARTIST:
- for random_artist in (
- await self.mass.music.artists.library_items(limit=1, order_by="random")
- ).items:
+ for random_artist in await self.mass.music.artists.library_items(
+ limit=1, order_by="random"
+ ):
# use the function specified in the queue controller as that
# already handles unwrapping an artist by user preference
tracks = await self.mass.music.artists.tracks(
) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
- # TODO: access the underlying paging on the deezer api instead of this hack
+ if offset:
+ # paging not supported, we always return the whole list at once
+ return []
+ # TODO: access the underlying paging on the deezer api (if possible))
playlist = await self.client.get_playlist(int(prov_playlist_id))
playlist_tracks = await playlist.get_tracks()
- for index, deezer_track in enumerate(playlist_tracks[offset : offset + limit], 1):
+ for index, deezer_track in enumerate(playlist_tracks, 1):
result.append(
self.parse_track(
track=deezer_track,
MediaItemImage,
MediaItemType,
MediaType,
- PagedItems,
Playlist,
ProviderMapping,
SearchResults,
)
return result
- async def browse(self, path: str, offset: int, limit: int) -> PagedItems[MediaItemType]:
+ async def browse(self, path: str, offset: int, limit: int) -> list[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
index += 1
if len(items) >= limit:
break
- total = len(items) if len(items) < limit else None
- return PagedItems(items=items, limit=limit, offset=offset, total=total)
+ return items
async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
"""Run library sync for this provider."""
) -> list[Track]:\r
"""Get playlist tracks."""\r
result: list[Track] = []\r
+ if offset:\r
+ # paging not supported, we always return the whole list at once\r
+ return []\r
# TODO: Does Jellyfin support paging here?\r
jellyfin_playlist = API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id)\r
playlist_items = await self._get_children(\r
)\r
if not playlist_items:\r
return result\r
- for index, jellyfin_track in enumerate(playlist_items[offset : offset + limit], 1):\r
+ for index, jellyfin_track in enumerate(playlist_items, 1):\r
try:\r
if track := await self._parse_track(jellyfin_track):\r
if not track.position:\r
except (ParameterError, DataNotFoundError) as e:
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from e
+ if offset:
+ # paging not supported, we always return the whole list at once
+ return []
# TODO: figure out if subsonic supports paging here
- for index, sonic_song in enumerate(sonic_playlist.songs[offset : offset + limit], 1):
+ for index, sonic_song in enumerate(sonic_playlist.songs, 1):
track = self._parse_track(sonic_song)
track.position = offset + index
result.append(track)
) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
- # TODO: implement paging ?!
+ if offset:
+ # paging not supported, we always return the whole list at once
+ return []
plex_playlist: PlexPlaylist = await self._get_data(prov_playlist_id, PlexPlaylist)
if not (playlist_items := await self._run_async(plex_playlist.items)):
return result
- for index, plex_track in enumerate(playlist_items[offset : offset + limit], 1):
+ for index, plex_track in enumerate(playlist_items, 1):
if track := await self._parse_track(plex_track):
track.position = index
result.append(track)
MediaItemLink,
MediaItemType,
MediaType,
- PagedItems,
ProviderMapping,
Radio,
SearchResults,
return result
- async def browse(self, path: str, offset: int, limit: int) -> PagedItems[MediaItemType]:
+ async def browse(self, path: str, offset: int, limit: int) -> list[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
"""
- items: list[BrowseFolder | Radio] = []
subpath = path.split("://", 1)[1]
subsubpath = "" if "/" not in subpath else subpath.split("/")[-1]
if not subpath:
# return main listing
- items = [
+ return [
BrowseFolder(
item_id="popular",
provider=self.domain,
]
if subpath == "popular":
- items = await self.get_by_popularity(limit=limit, offset=offset)
+ return await self.get_by_popularity(limit=limit, offset=offset)
if subpath == "tag":
tags = await self.radios.tags(
reverse=True,
)
tags.sort(key=lambda tag: tag.name)
- items = [
+ return [
BrowseFolder(
item_id=tag.name.lower(),
provider=self.domain,
]
if subpath == "country":
+ items: list[BrowseFolder | Radio] = []
for country in await self.radios.countries(
order=Order.NAME, hide_broken=True, limit=limit, offset=offset
):
)
]
items.append(folder)
+ return items
if subsubpath in await self.get_tag_names(limit=limit, offset=offset):
- items = await self.get_by_tag(subsubpath)
+ return await self.get_by_tag(subsubpath)
if subsubpath in await self.get_country_codes(limit=limit, offset=offset):
- items = await self.get_by_country(subsubpath)
- total = len(items) if len(items) < limit else None
- return PagedItems(items=items, limit=limit, offset=offset, total=total)
+ return await self.get_by_country(subsubpath)
+ return []
async def get_tag_names(self, limit: int, offset: int):
"""Get a list of tag names."""
playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
if "tracks" not in playlist_obj:
return result
- for index, item in enumerate(playlist_obj["tracks"][offset : offset + limit], 1):
+ if offset:
+ # paging not supported, we always return the whole list at once
+ return []
+ for index, item in enumerate(playlist_obj["tracks"], 1):
song = await self._soundcloud.get_track_details(item["id"])
try:
# TODO: is it really needed to grab the entire track with an api call ?
return None
result = []
# TODO: figure out how to handle paging in YTM
- for index, track_obj in enumerate(playlist_obj["tracks"][offset : offset + limit]):
+ if offset:
+ # paging not supported, we always return the whole list at once
+ return []
+ for index, track_obj in enumerate(playlist_obj["tracks"], 1):
if track_obj["isAvailable"]:
# Playlist tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id