self,
item_id: str,
provider_instance_id_or_domain: str,
- limit: int | None = None,
- offset: int | None = None,
+ page: int = 0,
) -> list[PlaylistTrack]:
"""Get tracks for given playlist."""
return [
"music/playlists/playlist_tracks",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
- limit=limit,
- offset=offset,
+ page=page,
)
]
async def browse(
self,
path: str | None = None,
- limit: int | None = None,
- offset: int | None = None,
) -> list[MediaItemType | ItemMapping]:
"""Browse Music providers."""
return [
media_from_dict(obj)
- for obj in await self.client.send_command(
- "music/browse",
- path=path,
- limit=limit,
- offset=offset,
- )
+ for obj in await self.client.send_command("music/browse", path=path)
]
async def recently_played(
import random
import time
+from collections.abc import AsyncGenerator
from typing import Any
from music_assistant.common.helpers.json import serialize_to_json
item_id: str,
provider_instance_id_or_domain: str,
force_refresh: bool = False,
- offset: int = 0,
- limit: int = 50,
- prefer_library_items: bool = True,
- ) -> list[PlaylistTrack]:
+ ) -> AsyncGenerator[PlaylistTrack, None]:
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(
item_id,
# a playlist can only have one provider so simply pick the first one
prov_map = next(x for x in playlist.provider_mappings)
cache_checksum = playlist.cache_checksum
- # playlist tracks ar enot stored in the db,
+ # playlist tracks are not stored in the db,
# we always fetched them (cached) from the provider
- tracks = await self._get_provider_playlist_tracks(
- prov_map.item_id,
- prov_map.provider_instance,
- cache_checksum=cache_checksum,
- offset=offset,
- limit=limit,
- force_refresh=force_refresh,
- )
- if prefer_library_items:
- final_tracks = []
+ page = 0
+ while True:
+ tracks = await self._get_provider_playlist_tracks(
+ prov_map.item_id,
+ prov_map.provider_instance,
+ cache_checksum=cache_checksum,
+ page=page,
+ force_refresh=force_refresh,
+ )
+ if not tracks:
+ break
for track in tracks:
- # prefer library_item
- # TODO: we could speedup this call by requesting all tracks at once
- # but so far this doesn't seem to be that slow due to the paging
- if track.provider == "library":
- final_tracks.append(track)
- elif db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
- track.item_id, track.provider
- ):
- db_item.position = track.position
- final_tracks.append(db_item)
- else:
- # fall back to the original playlist item if we do not know it in the db
- final_tracks.append(track)
- else:
- final_tracks = tracks
- return final_tracks
+ yield track
+ page += 1
async def create_playlist(
self, name: str, provider_instance_or_domain: str | None = None
raise ProviderUnavailableError(msg)
cur_playlist_track_ids = set()
cur_playlist_track_uris = set()
- for item in await self.get_all_playlist_tracks(playlist):
+ async for item in self.tracks(playlist.item_id, playlist.provider):
cur_playlist_track_uris.add(item.item_id)
cur_playlist_track_uris.add(item.uri)
ids_to_add.add(item_id)
continue
- # ensure we have a full library track
+ # ensure we have a full (library) track (including all provider mappings)
full_track = await self.mass.music.tracks.get(
item_id,
provider_instance_id_or_domain,
recursive=provider_instance_id_or_domain != "library",
)
- if full_track.provider == "library":
- db_track = full_track
- else:
- db_track = await self.mass.music.tracks.add_item_to_library(full_track)
+ track_prov_domains = {x.provider_domain for x in full_track.provider_mappings}
+ if (
+ playlist_prov.domain != "builtin"
+ and playlist_prov.is_streaming_provider
+ and playlist_prov.domain not in track_prov_domains
+ ):
+ # try to match the track to the playlist provider
+ full_track.provider_mappings.update(
+ await self.mass.music.tracks.match_provider(playlist_prov, full_track, False)
+ )
+
# a track can contain multiple versions on the same provider
# simply sort by quality and just add the first available version
for track_version in sorted(
- db_track.provider_mappings, key=lambda x: x.quality, reverse=True
+ full_track.provider_mappings, key=lambda x: x.quality, reverse=True
):
if not track_version.available:
continue
if track_version_uri in cur_playlist_track_uris:
self.logger.warning(
"Not adding %s to playlist %s - it already exists",
- db_track.name,
+ full_track.name,
playlist.name,
)
break # already existing in the playlist
ids_to_add.add(track_version_uri)
self.logger.info(
"Adding %s to playlist %s",
- db_track.name,
+ full_track.name,
playlist.name,
)
break
ids_to_add.add(track_version.item_id)
self.logger.info(
"Adding %s to playlist %s",
- db_track.name,
+ full_track.name,
playlist.name,
)
break
else:
self.logger.warning(
"Can't add %s to playlist %s - it is not available provider %s",
- db_track.name,
+ full_track.name,
playlist.name,
playlist_prov.name,
)
playlist.cache_checksum = str(time.time())
await self.update_item_in_library(db_playlist_id, playlist)
- async def get_all_playlist_tracks(
- self, playlist: Playlist, prefer_library_items: bool = False
- ) -> list[PlaylistTrack]:
- """Return all tracks for given playlist (by unwrapping the paged listing)."""
- result: list[PlaylistTrack] = []
- offset = 0
- limit = 50
- self.logger.debug(
- "Fetching all tracks for playlist %s",
- playlist.name,
- )
- while True:
- paged_items = await self.tracks(
- item_id=playlist.item_id,
- provider_instance_id_or_domain=playlist.provider,
- offset=offset,
- limit=limit,
- prefer_library_items=prefer_library_items,
- )
- result += paged_items
- if len(paged_items) > limit:
- # this happens if the provider doesn't support paging
- # and it does simply return all items in one call
- break
- if len(paged_items) == 0:
- break
- if len(paged_items) < (limit - 20):
- # if get get less than 30 items, we assume this is the end
- # note that we account for the fact that the provider might
- # return less than the limit (e.g. 20 items) due to track unavailability
- break
-
- offset += limit
- return result
-
async def _add_library_item(self, item: Playlist) -> int:
"""Add a new record to the database."""
new_item = await self.mass.music.database.insert(
item_id: str,
provider_instance_id_or_domain: str,
cache_checksum: Any = None,
- offset: int = 0,
- limit: int = 50,
+ page: int = 0,
force_refresh: bool = False,
) -> list[PlaylistTrack]:
"""Return playlist tracks for the given provider playlist id."""
if not provider:
return []
# prefer cache items (if any)
- cache_key = f"{provider.lookup_key}.playlist.{item_id}.tracks.{offset}.{limit}"
+ cache_key = f"{provider.lookup_key}.playlist.{item_id}.tracks.{page}"
if (
not force_refresh
and (cache := await self.mass.cache.get(cache_key, checksum=cache_checksum)) is not None
return [PlaylistTrack.from_dict(x) for x in cache]
# no items in cache (or force_refresh) - get listing from provider
result: list[Track] = []
- for item in await provider.get_playlist_tracks(item_id, offset=offset, limit=limit):
+ for item in await provider.get_playlist_tracks(item_id, page=page):
# double check if position set
assert item.position is not None, "Playlist items require position to be set"
result.append(item)
playlist = await self.get(item_id, provider_instance_id_or_domain)
playlist_tracks = [
x
- for x in await self.get_all_playlist_tracks(playlist)
+ async for x in self.tracks(playlist.item_id, playlist.provider)
# filter out unavailable tracks
if x.available
]
radio_items: list[Track] = []
radio_item_titles: set[str] = set()
- playlist_tracks = await self.get_all_playlist_tracks(media_item, prefer_library_items=True)
+ playlist_tracks = [x async for x in self.tracks(media_item.item_id, media_item.provider)]
random.shuffle(playlist_tracks)
for playlist_track in playlist_tracks:
+ # prefer library item if available so we can use all providers
+ if playlist_track.provider != "library" and (
+ db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
+ playlist_track.item_id, playlist_track.provider
+ )
+ ):
+ playlist_track = db_item # noqa: PLW2901
+
if not playlist_track.available:
continue
# include base item in the list
MusicAssistantError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import Album, Artist, ItemMapping, Track, UniqueList
+from music_assistant.common.models.media_items import (
+ Album,
+ Artist,
+ ItemMapping,
+ ProviderMapping,
+ Track,
+ UniqueList,
+)
from music_assistant.constants import (
DB_TABLE_ALBUM_TRACKS,
DB_TABLE_ALBUMS,
compare_track,
loose_compare_strings,
)
+from music_assistant.server.models.music_provider import MusicProvider
from .base import MediaControllerBase
continue
if not provider.library_supported(MediaType.TRACK):
continue
- self.logger.debug(
- "Trying to match track %s on provider %s", db_track.name, provider.name
+ provider_matches = await self.match_provider(
+ provider, db_track, strict=True, ref_albums=track_albums
)
- match_found = False
+ for provider_mapping in provider_matches:
+ # 100% match, we update the db with the additional provider mapping(s)
+ await self.add_provider_mapping(db_track.item_id, provider_mapping)
+ db_track.provider_mappings.add(provider_mapping)
+
+ async def match_provider(
+ self,
+ provider: MusicProvider,
+ ref_track: Track,
+ strict: bool = True,
+ ref_albums: list[Album] | None = None,
+ ) -> set[ProviderMapping]:
+ """Try to find matching track on given provider."""
+ if ref_albums is None:
+ ref_albums = await self.albums(ref_track.item_id, ref_track.provider)
+ if ProviderFeature.SEARCH not in provider.supported_features:
+ raise UnsupportedFeaturedException("Provider does not support search")
+ if not provider.is_streaming_provider:
+ raise UnsupportedFeaturedException("Matching only possible for streaming providers")
+ self.logger.debug("Trying to match track %s on provider %s", ref_track.name, provider.name)
+ matches: set[ProviderMapping] = set()
+ for artist in ref_track.artists:
+ if matches:
+ break
for search_str in (
- db_track.name,
- f"{db_track.artists[0].name} - {db_track.name}",
- f"{db_track.artists[0].name} {db_track.name}",
+ ref_track.name,
+ f"{artist.name} - {ref_track.name}",
+ f"{artist.name} {ref_track.name}",
):
- if match_found:
+ if matches:
break
search_result = await self.search(search_str, provider.domain)
for search_result_item in search_result:
if not search_result_item.available:
continue
# do a basic compare first
- if not compare_media_item(db_track, search_result_item, strict=False):
+ if not compare_media_item(ref_track, search_result_item, strict=False):
continue
# we must fetch the full version, search results can be simplified objects
prov_track = await self.get_provider_item(
search_result_item.provider,
fallback=search_result_item,
)
- if compare_track(db_track, prov_track, strict=True, track_albums=track_albums):
- # 100% match, we update the db with the additional provider mapping(s)
- match_found = True
- for provider_mapping in search_result_item.provider_mappings:
- await self.add_provider_mapping(db_track.item_id, provider_mapping)
- db_track.provider_mappings.add(provider_mapping)
+ if compare_track(ref_track, prov_track, strict=strict, track_albums=ref_albums):
+ matches.update(search_result_item.provider_mappings)
- if not match_found:
- self.logger.debug(
- "Could not find match for Track %s on provider %s",
- db_track.name,
- provider.name,
- )
+ if not matches:
+ self.logger.debug(
+ "Could not find match for Track %s on provider %s",
+ ref_track.name,
+ provider.name,
+ )
+ return matches
async def _get_provider_dynamic_tracks(
self,
return result
@api_command("music/browse")
- async def browse(self, offset: int, limit: int, path: str | None = None) -> list[MediaItemType]:
+ async def browse(self, path: str | None = None) -> list[MediaItemType]:
"""Browse Music providers."""
if not path or path == "root":
# root level; folder per provider
)
if not prov:
return prepend_items
- elif offset == 0:
+ else:
back_path = f"{provider_instance}://" + "/".join(sub_path.split("/")[:-1])
prepend_items.append(
BrowseFolder(item_id="back", provider=provider_instance, path=back_path, name="..")
)
# limit -1 to account for the prepended items
- prov_items = await prov.browse(path=path, offset=offset, limit=limit)
+ prov_items = await prov.browse(path=path)
return prepend_items + prov_items
@api_command("music/recently_played_items")
provider = self.mass.get_provider(item.provider)
if provider.library_edit_supported(item.media_type):
await provider.library_add(item)
+ # ensure a full item
+ item = await ctrl.get(item.item_id, item.provider)
library_item = await ctrl.add_item_to_library(item)
# perform full metadata scan (and provider match)
await self.mass.metadata.update_metadata(library_item)
# race conditions when multiple providers are syncing at the same time.
async with self._sync_lock:
await provider.sync_library(media_types)
+ # precache playlist tracks
+ if MediaType.PLAYLIST in media_types:
+ for playlist in await self.playlists.library_items(provider=provider_instance):
+ async for _ in self.playlists.tracks(playlist.item_id, playlist.provider):
+ pass
# we keep track of running sync tasks
task = self.mass.create_task(run_sync())
if radio_mode:
radio_source.append(media_item)
elif media_item.media_type == MediaType.PLAYLIST:
- tracks += await self.mass.music.playlists.get_all_playlist_tracks(media_item)
+ first_track_seen: bool = False
+ async for playlist_track in self.mass.music.playlists.tracks(
+ media_item.item_id, media_item.provider
+ ):
+ if not playlist_track.available:
+ continue
+ # allow first track to start playing immediately while we still
+ # work out the rest of the queue
+ if (
+ not queue.shuffle_enabled
+ and not first_track_seen
+ and option == QueueOption.REPLACE
+ and not start_item
+ ):
+ first_track_seen = True
+ self.load(
+ queue_id,
+ queue_items=[QueueItem.from_media_item(queue_id, playlist_track)],
+ keep_remaining=False,
+ keep_played=False,
+ )
+ await self.play_index(queue_id, 0)
+ # add the remaining items
+ option = QueueOption.ADD
+ else:
+ tracks.append(playlist_track)
self.mass.create_task(
self.mass.music.mark_item_played(
media_item.media_type, media_item.item_id, media_item.provider
try:
args = parse_arguments(handler.signature, handler.type_hints, msg.args)
result = handler.target(**args)
- if asyncio.iscoroutine(result):
+ if hasattr(result, "__anext__"):
+ # handle async generator
+ result = [x async for x in result]
+ elif asyncio.iscoroutine(result):
result = await result
self._send_message(SuccessResultMessage(msg.message_id, result))
except Exception as err: # pylint: disable=broad-except
raise NotImplementedError
async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
+ self,
+ prov_playlist_id: str,
+ page: int = 0,
) -> list[Track]:
"""Get all playlist tracks for given playlist id."""
if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
return await self.get_radio(prov_item_id)
return await self.get_track(prov_item_id)
- async def browse(
- self, path: str, offset: int, limit: int
- ) -> Sequence[MediaItemType | ItemMapping]:
+ async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provider_id://artists).
subpath = path.split("://", 1)[1]
# this reference implementation can be overridden with a provider specific approach
if subpath == "artists":
- return await self.mass.music.artists.library_items(
- limit=limit, offset=offset, provider=self.instance_id
- )
+ return await self.mass.music.artists.library_items(provider=self.instance_id)
if subpath == "albums":
- return await self.mass.music.albums.library_items(
- limit=limit, offset=offset, provider=self.instance_id
- )
+ return await self.mass.music.albums.library_items(provider=self.instance_id)
if subpath == "tracks":
- return await self.mass.music.tracks.library_items(
- limit=limit, offset=offset, provider=self.instance_id
- )
+ return await self.mass.music.tracks.library_items(provider=self.instance_id)
if subpath == "radios":
- return await self.mass.music.radio.library_items(
- limit=limit, offset=offset, provider=self.instance_id
- )
+ return await self.mass.music.radio.library_items(provider=self.instance_id)
if subpath == "playlists":
- return await self.mass.music.playlists.library_items(
- limit=limit, offset=offset, provider=self.instance_id
- )
+ return await self.mass.music.playlists.library_items(provider=self.instance_id)
if subpath:
# unknown path
msg = "Invalid subpath"
)
self.mass.players.register_or_update(mass_player)
# update can_sync_with field of all other players
+ # this ensure that the field always contains all player ids,
+ # even when a player joins later on
for player in self.players:
if player.player_id == player_id:
continue
tracks.append(track)
return tracks
- async def get_playlist_tracks(self, prov_playlist_id, offset, limit) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id, page: int = 0) -> list[Track]:
"""Get all playlist tracks for given playlist id."""
if self._is_catalog_id(prov_playlist_id):
endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}/tracks"
else:
endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
result = []
+ page_size = 200
+ offset = page * page_size
response = await self._get_data(
- endpoint, include="artists,catalog", limit=limit, offset=offset
+ endpoint, include="artists,catalog", limit=page_size, offset=offset
)
if not response or "data" not in response:
return result
self.mass.config.set(key, stored_items)
return True
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
+ if page > 0:
+ # paging not supported, we always return the whole list at once
+ return []
if prov_playlist_id in BUILTIN_PLAYLISTS:
- if offset:
- # paging not supported, we always return the whole list at once
- return []
return await self._get_builtin_playlist_tracks(prov_playlist_id)
# user created universal playlist
result: list[Track] = []
- playlist_items = await self._read_playlist_file_items(prov_playlist_id, offset, limit)
- for index, uri in enumerate(playlist_items):
+ playlist_items = await self._read_playlist_file_items(prov_playlist_id)
+ for index, uri in enumerate(playlist_items, 1):
try:
media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
media_controller = self.mass.music.get_controller(media_type)
item_id, provider_instance_id_or_domain
)
assert isinstance(track, Track)
- track.position = offset + index
+ track.position = index
result.append(track)
except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
self.logger.warning(
except KeyError:
raise MediaNotFoundError(f"No built in playlist: {builtin_playlist_id}")
- async def _read_playlist_file_items(
- self, playlist_id: str, offset: int = 0, limit: int = 100000
- ) -> list[str]:
+ async def _read_playlist_file_items(self, playlist_id: str) -> list[str]:
"""Return lines of a playlist file."""
playlist_file = os.path.join(self._playlists_dir, playlist_id)
if not await asyncio.to_thread(os.path.isfile, playlist_file):
aiofiles.open(playlist_file, "r", encoding="utf-8") as _file,
):
lines = await _file.readlines()
- return [x.strip() for x in lines[offset : offset + limit]]
+ return [x.strip() for x in lines]
async def _write_playlist_file_items(self, playlist_id: str, lines: list[str]) -> None:
"""Return lines of a playlist file."""
for deezer_track in await album.get_tracks()
]
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
- if offset:
+ if page > 0:
# paging not supported, we always return the whole list at once
return []
# TODO: access the underlying paging on the deezer api (if possible))
self.parse_track(
track=deezer_track,
user_country=self.gw_client.user_country,
- position=offset + index,
+ position=index,
)
)
return result
)
return result
- async def browse(self, path: str, offset: int, limit: int) -> list[MediaItemType | ItemMapping]:
+ async def browse(self, path: str) -> list[MediaItemType | ItemMapping]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
"""
- if offset:
- # we do not support pagination
- return []
items: list[MediaItemType | ItemMapping] = []
item_path = path.split("://", 1)[1]
if not item_path:
if any(x.provider_instance == self.instance_id for x in track.provider_mappings)
]
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
+ if page > 0:
+ # paging not (yet) supported
+ return result
if not await self.exists(prov_playlist_id):
msg = f"Playlist path does not exist: {prov_playlist_id}"
raise MediaNotFoundError(msg)
else:
playlist_lines = parse_pls(playlist_data)
- playlist_lines = playlist_lines[offset : offset + limit]
-
- for line_no, playlist_line in enumerate(playlist_lines):
+ for idx, playlist_line in enumerate(playlist_lines, 1):
if track := await self._parse_playlist_line(
playlist_line.path, os.path.dirname(prov_playlist_id)
):
- track.position = offset + line_no
+ track.position = idx
result.append(track)
except Exception as err: # pylint: disable=broad-except
playlist_items = parse_pls(playlist_data)
# remove items by index
for i in sorted(positions_to_remove, reverse=True):
- del playlist_items[i]
+ # position = index + 1
+ del playlist_items[i - 1]
# build new playlist data
new_playlist_data = "#EXTM3U\n"
for item in playlist_items:
supported_features: list[PlayerFeature] = []
if MediaPlayerEntityFeature.GROUPING in hass_supported_features:
supported_features.append(PlayerFeature.SYNC)
+ if MediaPlayerEntityFeature.PAUSE in hass_supported_features:
+ supported_features.append(PlayerFeature.PAUSE)
if MediaPlayerEntityFeature.MEDIA_ENQUEUE in hass_supported_features:
supported_features.append(PlayerFeature.ENQUEUE_NEXT)
if MediaPlayerEntityFeature.VOLUME_SET in hass_supported_features:
raise MediaNotFoundError(f"Item {prov_playlist_id} not found")
return parse_playlist(self.instance_id, self._client, playlist)
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
- if offset:
+ if page > 0:
# paging not supported, we always return the whole list at once
return []
# TODO: Does Jellyfin support paging here?
self.logger, self.instance_id, self._client, jellyfin_track
):
if not track.position:
- track.position = offset + index
+ track.position = index
result.append(track)
except (KeyError, ValueError) as err:
self.logger.error(
raise MediaNotFoundError(msg) from e
return self._parse_playlist(sonic_playlist)
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
+ if page > 0:
+ # paging not supported, we always return the whole list at once
+ return result
try:
sonic_playlist: SonicPlaylist = await self._run_async(
self._conn.getPlaylist, prov_playlist_id
except (ParameterError, DataNotFoundError) as e:
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from e
- if offset:
- # paging not supported, we always return the whole list at once
- return []
+
# TODO: figure out if subsonic supports paging here
for index, sonic_song in enumerate(sonic_playlist.songs, 1):
track = self._parse_track(sonic_song)
- track.position = offset + index
+ track.position = index
result.append(track)
return result
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
) -> None:
"""Remove selected positions from the playlist."""
+ idx_to_remove = [pos - 1 for pos in positions_to_remove]
try:
await self._run_async(
self._conn.updatePlaylist,
lid=prov_playlist_id,
- songIndexesToRemove=list(positions_to_remove),
+ songIndexesToRemove=idx_to_remove,
)
except SonicError:
msg = f"Failed to remove songs from {prov_playlist_id}, check your permissions."
if token == AUTH_TOKEN_UNAUTH:
# Doing local connection, not via plex.tv.
plex_server = PlexServer(plex_url)
- # I don't think PlexAPI intends for this to be accessible, but we need it.
- self._baseurl = plex_server._baseurl
else:
plex_server = PlexServer(
plex_url,
token,
session=session,
)
+ # I don't think PlexAPI intends for this to be accessible, but we need it.
+ self._baseurl = plex_server._baseurl
except plexapi.exceptions.BadRequest as err:
if "Invalid token" in str(err):
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
- if offset:
+ if page > 0:
# paging not supported, we always return the whole list at once
return []
plex_playlist: PlexPlaylist = await self._get_data(prov_playlist_id, PlexPlaylist)
if (item and item["id"])
]
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
+ page_size = 100
+ offset = page * page_size
qobuz_result = await self._get_data(
"playlist/get",
key="tracks",
playlist_id=prov_playlist_id,
extra="tracks",
offset=offset,
- limit=limit,
+ limit=page_size,
)
- for index, track_obj in enumerate(qobuz_result["tracks"]["items"]):
+ for index, track_obj in enumerate(qobuz_result["tracks"]["items"], 1):
if not (track_obj and track_obj["id"]):
continue
track = await self._parse_track(track_obj)
"""Remove track(s) from playlist."""
playlist_track_ids = set()
for pos in positions_to_remove:
- for track in await self.get_playlist_tracks(prov_playlist_id, pos, pos):
- playlist_track_ids.add(str(track["playlist_track_id"]))
+ idx = pos - 1
+ qobuz_result = await self._get_data(
+ "playlist/get",
+ key="tracks",
+ playlist_id=prov_playlist_id,
+ extra="tracks",
+ offset=idx,
+ limit=1,
+ )
+ if not qobuz_result:
+ continue
+ playlist_track_id = qobuz_result["tracks"]["items"][0]["playlist_track_id"]
+ playlist_track_ids.add(str(playlist_track_id))
+
return await self._get_data(
"playlist/deleteTracks",
playlist_id=prov_playlist_id,
return result
- async def browse(self, path: str, offset: int, limit: int) -> Sequence[MediaItemType]:
+ async def browse(self, path: str) -> Sequence[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
"""
- if offset != 0:
- # paging is broken on RadioBrowser, we just return some big lists
- return []
subpath = path.split("://", 1)[1]
subsubpath = "" if "/" not in subpath else subpath.split("/")[-1]
self.mass_player.can_sync_with = tuple(
x.player_id
for x in self.sonos_prov.sonosplayers.values()
- if x.sync_coordinator is None and x.player_id != self.player_id
+ if x.player_id != self.player_id
)
if self.sync_coordinator:
# player is syned to another player
self.logger.debug("Parse playlist failed: %s", playlist_obj, exc_info=error)
return playlist
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
- # TODO: soundcloud doesn't seem to support paging for playlist tracks ?!
+ if page > 0:
+ # TODO: soundcloud doesn't seem to support paging for playlist tracks ?!
+ return result
playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
if "tracks" not in playlist_obj:
return result
- if offset:
- # paging not supported, we always return the whole list at once
- return []
for index, item in enumerate(playlist_obj["tracks"], 1):
+ # TODO: is it really needed to grab the entire track with an api call ?
song = await self._soundcloud.get_track_details(item["id"])
try:
- # TODO: is it really needed to grab the entire track with an api call ?
- if track := await self._parse_track(song[0], index + offset):
+ if track := await self._parse_track(song[0], index):
result.append(track)
except (KeyError, TypeError, InvalidDataError, IndexError) as error:
self.logger.debug("Parse track failed: %s", song, exc_info=error)
if item["id"]
]
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
uri = (
if prov_playlist_id == self._get_liked_songs_playlist_id()
else f"playlists/{prov_playlist_id}/tracks"
)
- spotify_result = await self._get_data(uri, limit=limit, offset=offset)
+ page_size = 50
+ offset = page * page_size
+ spotify_result = await self._get_data(uri, limit=page_size, offset=offset)
for index, item in enumerate(spotify_result["items"], 1):
if not (item and item["track"] and item["track"]["id"]):
continue
"""Remove track(s) from playlist."""
track_uris = []
for pos in positions_to_remove:
- for track in await self.get_playlist_tracks(prov_playlist_id, pos, pos):
- track_uris.append({"uri": f"spotify:track:{track.item_id}"})
+ uri = f"playlists/{prov_playlist_id}/tracks"
+ spotify_result = await self._get_data(uri, limit=1, offset=pos - 1)
+ for item in spotify_result["items"]:
+ if not (item and item["track"] and item["track"]["id"]):
+ continue
+ track_uris.append({"uri": f'spotify:track:{item["track"]["id"]}'})
data = {"tracks": track_uris}
await self._delete_data(f"playlists/{prov_playlist_id}/tracks", data=data)
self.logger.warning(f"Failed to get toptracks for artist {prov_artist_id}: {err}")
return []
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
tidal_session = await self._get_tidal_session()
result: list[Track] = []
+ page_size = 200
+ offset = page * page_size
track_obj: TidalTrack # satisfy the type checker
tidal_tracks = await get_playlist_tracks(
- tidal_session, prov_playlist_id, limit=limit, offset=offset
+ tidal_session, prov_playlist_id, limit=page_size, offset=offset
)
for index, track_obj in enumerate(tidal_tracks, 1):
track = self._parse_track(track_obj=track_obj)
"""Remove track(s) from playlist."""
prov_track_ids = []
tidal_session = await self._get_tidal_session()
- for track in await self.get_playlist_tracks(prov_playlist_id, 0, 10000):
- if track.position in positions_to_remove:
- prov_track_ids.append(track.item_id)
- if len(prov_track_ids) == len(positions_to_remove):
- break
+ for pos in positions_to_remove:
+ for tidal_track in await get_playlist_tracks(
+ tidal_session, prov_playlist_id, limit=1, offset=pos - 1
+ ):
+ prov_track_ids.append(tidal_track.id)
return await remove_playlist_tracks(tidal_session, prov_playlist_id, prov_track_ids)
async def create_playlist(self, name: str) -> Playlist:
)
},
)
+ various_artist_album: bool = False
for artist_obj in album_obj.artists:
+ if artist_obj.name == "Various Artists":
+ various_artist_album = True
album.artists.append(self._parse_artist(artist_obj))
- if album_obj.type == "ALBUM":
- album.album_type = AlbumType.ALBUM
- elif album_obj.type == "COMPILATION":
+
+ if album_obj.type == "COMPILATION" or various_artist_album:
album.album_type = AlbumType.COMPILATION
+ elif album_obj.type == "ALBUM":
+ album.album_type = AlbumType.ALBUM
elif album_obj.type == "EP":
album.album_type = AlbumType.EP
elif album_obj.type == "SINGLE":
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
- async def get_playlist_tracks(
- self, prov_playlist_id: str, offset: int, limit: int
- ) -> list[Track]:
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Return playlist tracks for the given provider playlist id."""
+ if page > 0:
+ # paging not supported, we always return the whole list at once
+ return []
await self._check_oauth_token()
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
return None
result = []
# TODO: figure out how to handle paging in YTM
- if offset:
- # paging not supported, we always return the whole list at once
- return []
for index, track_obj in enumerate(playlist_obj["tracks"], 1):
if track_obj["isAvailable"]:
# Playlist tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id
try:
if track := self._parse_track(track_obj):
- track.position = index + 1
+ track.position = index
result.append(track)
except InvalidDataError:
if track := await self.get_track(track_obj["videoId"]):
- track.position = index + 1
+ track.position = index
result.append(track)
# YTM doesn't seem to support paging so we ignore offset and limit
return result
artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
if artist_obj.get("songs") and artist_obj["songs"].get("browseId"):
prov_playlist_id = artist_obj["songs"]["browseId"]
- return await self.get_playlist_tracks(prov_playlist_id, 0, 25)
+ playlist_tracks = await self.get_playlist_tracks(prov_playlist_id)
+ return playlist_tracks[:25]
return []
async def library_add(self, item: MediaItemType) -> bool: