import logging
from collections.abc import Iterable
from dataclasses import dataclass
+from enum import Enum
from types import NoneType
from typing import Any
"""Parse Config from the raw values (as stored in persistent storage)."""
conf = cls.from_dict({**raw, "values": {}})
for entry in config_entries:
+ # unpack Enum value in default_value
+ if isinstance(entry.default_value, Enum):
+ entry.default_value = entry.default_value.value
# create a copy of the entry
conf.values[entry.key] = ConfigEntry.from_dict(entry.to_dict())
conf.values[entry.key].parse_value(
items: list[_T],
limit: int,
offset: int,
+ total: int | None,
count: int | None = None,
- total: int | None = None,
):
"""Initialize PagedItems."""
self.items = items
self.limit = limit
self.offset = offset
self.total = total
- if total is None and count != limit:
- # total is important so always calculate it from count if omitted
- self.total = offset + count
def to_dict(self, *args, **kwargs) -> dict[str, Any]:
"""Return PagedItems as serializable dict."""
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import ProviderFeature
-from music_assistant.common.models.errors import MediaNotFoundError, UnsupportedFeaturedException
+from music_assistant.common.models.errors import (
+ MediaNotFoundError,
+ ProviderUnavailableError,
+ UnsupportedFeaturedException,
+)
from music_assistant.common.models.media_items import (
Album,
AlbumType,
if len(ref_tracks) < 10:
# fetch reference tracks from provider(s) attached to the artist
for provider_mapping in db_artist.provider_mappings:
- ref_tracks += await self.mass.music.artists.tracks(
- provider_mapping.item_id, provider_mapping.provider_instance
- )
+ with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
+ ref_tracks += await self.mass.music.artists.tracks(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
for ref_track in ref_tracks:
for search_str in (
f"{db_artist.name} - {ref_track.name}",
if len(ref_albums) < 10:
# fetch reference albums from provider(s) attached to the artist
for provider_mapping in db_artist.provider_mappings:
- ref_albums += await self.mass.music.artists.albums(
- provider_mapping.item_id, provider_mapping.provider_instance
- )
+ with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
+ ref_albums += await self.mass.music.artists.albums(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
for ref_album in ref_albums:
if ref_album.album_type == AlbumType.COMPILATION:
continue
force_refresh=force_refresh,
lazy=not force_refresh,
)
- prov = next(x for x in playlist.provider_mappings)
+ prov_map = next(x for x in playlist.provider_mappings)
+ cache_checksum = playlist.metadata.cache_checksum
tracks = await self._get_provider_playlist_tracks(
- prov.item_id,
- prov.provider_instance,
- cache_checksum=playlist.metadata.cache_checksum,
+ prov_map.item_id,
+ prov_map.provider_instance,
+ cache_checksum=cache_checksum,
offset=offset,
limit=limit,
)
final_tracks.append(track)
else:
final_tracks = tracks
- return PagedItems(items=final_tracks, limit=limit, offset=offset)
+ # we set total to None as we have no idea how many tracks there are
+ # the frontend can figure this out and stop paging when it gets an empty list
+ return PagedItems(items=final_tracks, limit=limit, offset=offset, total=None)
async def create_playlist(
self, name: str, provider_instance_or_domain: str | None = None
playlist.name,
)
while True:
- paged_items = await self.mass.music.playlists.tracks(
+ paged_items = await self.tracks(
item_id=playlist.item_id,
provider_instance_id_or_domain=playlist.provider,
offset=offset,
prefer_library_items=prefer_library_items,
)
result += paged_items.items
- if paged_items.count != limit:
+ if paged_items.total is not None and len(result) >= paged_items.total:
+ break
+ if paged_items.count == 0:
break
offset += paged_items.count
return result
name=prov.name,
)
)
- return PagedItems(items=root_items, limit=limit, offset=offset)
+ return PagedItems(items=root_items, limit=limit, offset=offset, total=len(root_items))
# provider level
prepend_items: list[MediaItemType] = []
BrowseFolder(item_id="root", provider="library", path="root", name="..")
)
if not prov:
- return PagedItems(items=prepend_items, limit=limit, offset=offset)
+ return PagedItems(
+ items=prepend_items, limit=limit, offset=offset, total=len(prepend_items)
+ )
elif offset == 0:
back_path = f"{provider_instance}://" + "/".join(sub_path.split("/")[:-1])
prepend_items.append(
# limit -1 to account for the prepended items
prov_items = await prov.browse(path, offset=offset, limit=limit)
prov_items.items = prepend_items + prov_items.items
+ if prov_items.total is not None:
+ prov_items.total += len(prepend_items)
return prov_items
@api_command("music/recently_played_items")
def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> PagedItems[QueueItem]:
"""Return all QueueItems for given PlayerQueue."""
if queue_id not in self._queue_items:
- return PagedItems(items=[], limit=limit, offset=offset)
+ return PagedItems(items=[], limit=limit, offset=offset, total=0)
return PagedItems(
items=self._queue_items[queue_id][offset : offset + limit],
items.append(item)
if len(items) >= limit:
break
+ # explicitly set total to None as we don't know the total count
+ total = None
else:
# no subpath: return main listing
if ProviderFeature.LIBRARY_ARTISTS in self.supported_features:
label="radios",
)
)
- return PagedItems(items=items, limit=limit, offset=offset)
+ total = len(items)
+ return PagedItems(items=items, limit=limit, offset=offset, total=total)
async def recommendations(self) -> list[MediaItemType]:
"""Get this provider's recommendations.
) -> list[Track]:
"""Get playlist tracks."""
if prov_playlist_id in BUILTIN_PLAYLISTS:
- return await self._get_builtin_playlist_tracks(prov_playlist_id)
+ result = await self._get_builtin_playlist_tracks(prov_playlist_id)
+ return result[offset : offset + limit]
# user created universal playlist
result: list[Track] = []
playlist_items = await self._read_playlist_file_items(prov_playlist_id, offset, limit)
result: list[Track] = []
# TODO: implement pagination!
playlist = await self.client.get_playlist(int(prov_playlist_id))
- for index, deezer_track in enumerate(await playlist.get_tracks()):
+ for index, deezer_track in enumerate(await playlist.get_tracks(offset=offset, limit=limit)):
result.append(
self.parse_track(
track=deezer_track,
index += 1
if len(items) >= limit:
break
- return PagedItems(items=items, limit=limit, offset=offset)
+ total = len(items) if len(items) < limit else None
+ return PagedItems(items=items, limit=limit, offset=offset, total=total)
async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
"""Run library sync for this provider."""
if platform.system() == "Darwin":
password_str = f":{password}" if password else ""
- mount_cmd = f"mount -t smbfs //{username}{password_str}@{server}/{share}{subfolder} {self.base_path}" # noqa: E501
+ mount_cmd = f'mount -t smbfs "//{username}{password_str}@{server}/{share}{subfolder}" "{self.base_path}"' # noqa: E501
elif platform.system() == "Linux":
options = [
options.append(f'password="{password}"')
if mount_options := self.config.get_value(CONF_MOUNT_OPTIONS):
options += mount_options.split(",")
- mount_cmd = f"mount -t cifs -o {','.join(options)} //{server}/{share}{subfolder} {self.base_path}" # noqa: E501
+
+ options_str = ",".join(options)
+ mount_cmd = (
+ f"mount -t cifs -o {options_str} "
+ f'"//{server}/{share}{subfolder}" "{self.base_path}"'
+ )
else:
msg = f"SMB provider is not supported on {platform.system()}"
)\r
if not playlist_items:\r
return result\r
- for index, jellyfin_track in enumerate(playlist_items):\r
+ for index, jellyfin_track in enumerate(playlist_items[offset : offset + limit]):\r
try:\r
if track := await self._parse_track(jellyfin_track):\r
if not track.position:\r
except (ParameterError, DataNotFoundError) as e:
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from e
- for index, sonic_song in enumerate(sonic_playlist.songs):
+ for index, sonic_song in enumerate(sonic_playlist.songs[offset : offset + limit]):
track = self._parse_track(sonic_song)
track.position = index
result.append(track)
plex_playlist: PlexPlaylist = await self._get_data(prov_playlist_id, PlexPlaylist)
if not (playlist_items := await self._run_async(plex_playlist.items)):
return result
- for index, plex_track in enumerate(playlist_items):
+ for index, plex_track in enumerate(playlist_items[offset : offset + limit]):
if track := await self._parse_track(plex_track):
track.position = index
result.append(track)
]
if subpath == "popular":
- items = await self.get_by_popularity()
+ items = await self.get_by_popularity(limit=limit, offset=offset)
if subpath == "tag":
tags = await self.radios.tags(
hide_broken=True,
- limit=100,
+ limit=limit,
+ offset=offset,
order=Order.STATION_COUNT,
reverse=True,
)
]
if subpath == "country":
- for country in await self.radios.countries(order=Order.NAME):
+ for country in await self.radios.countries(
+ order=Order.NAME, hide_broken=True, limit=limit, offset=offset
+ ):
folder = BrowseFolder(
item_id=country.code.lower(),
provider=self.domain,
]
items.append(folder)
- if subsubpath in await self.get_tag_names():
+ if subsubpath in await self.get_tag_names(limit=limit, offset=offset):
items = await self.get_by_tag(subsubpath)
- if subsubpath in await self.get_country_codes():
+ if subsubpath in await self.get_country_codes(limit=limit, offset=offset):
items = await self.get_by_country(subsubpath)
- return PagedItems(items=items, limit=limit, offset=offset)
+ total = len(items) if len(items) < limit else None
+ return PagedItems(items=items, limit=limit, offset=offset, total=total)
- async def get_tag_names(self):
+ async def get_tag_names(self, limit: int, offset: int):
"""Get a list of tag names."""
tags = await self.radios.tags(
hide_broken=True,
- limit=100,
+ limit=limit,
+ offset=offset,
order=Order.STATION_COUNT,
reverse=True,
)
tag_names.append(tag.name.lower())
return tag_names
- async def get_country_codes(self):
+ async def get_country_codes(self, limit: int, offset: int):
"""Get a list of country names."""
- countries = await self.radios.countries(order=Order.NAME)
+ countries = await self.radios.countries(
+ order=Order.NAME, hide_broken=True, limit=limit, offset=offset
+ )
country_codes = []
for country in countries:
country_codes.append(country.code.lower())
return country_codes
- async def get_by_popularity(self):
+ async def get_by_popularity(self, limit: int, offset: int):
"""Get radio stations by popularity."""
stations = await self.radios.stations(
hide_broken=True,
- limit=250,
+ limit=limit,
+ offset=offset,
order=Order.CLICK_COUNT,
reverse=True,
)
playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
if "tracks" not in playlist_obj:
return result
- for index, item in enumerate(playlist_obj["tracks"]):
+ for index, item in enumerate(playlist_obj["tracks"][offset : offset + limit]):
song = await self._soundcloud.get_track_details(item["id"])
try:
# TODO: is it really needed to grab the entire track with an api call ?
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_USERNAME
-from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.music_provider import MusicProvider
SUPPORTED_FEATURES = (
if "preset_id" not in item:
continue
# each radio station can have multiple streams add each one as different quality
- stream_info = await self.__get_data("Tune.ashx", id=item["preset_id"])
- for stream in stream_info["body"]:
- yield await self._parse_radio(item, stream, folder)
+ stream_info = await self._get_stream_info(item["preset_id"])
+ yield self._parse_radio(item, stream_info, folder)
elif item_type == "link" and item.get("item") == "url":
# custom url
try:
- yield await self._parse_radio(item)
+ yield self._parse_radio(item)
except InvalidDataError as err:
# there may be invalid custom urls, ignore those
self.logger.warning(str(err))
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get radio station details."""
if not prov_radio_id.startswith("http"):
- prov_radio_id, media_type = prov_radio_id.split("--", 1)
+ if "--" in prov_radio_id:
+ prov_radio_id, media_type = prov_radio_id.split("--", 1)
+ else:
+ media_type = None
params = {"c": "composite", "detail": "listing", "id": prov_radio_id}
result = await self.__get_data("Describe.ashx", **params)
if result and result.get("body") and result["body"][0].get("children"):
item = result["body"][0]["children"][0]
- stream_info = await self.__get_data("Tune.ashx", id=prov_radio_id)
- for stream in stream_info["body"]:
- if stream["media_type"] != media_type:
+ stream_info = await self._get_stream_info(prov_radio_id)
+ for stream in stream_info:
+ if media_type and stream["media_type"] != media_type:
continue
- return await self._parse_radio(item, stream)
+ return self._parse_radio(item, [stream])
# fallback - e.g. for handle custom urls ...
async for radio in self.get_library_radios():
if radio.item_id == prov_radio_id:
msg = f"Item {prov_radio_id} not found"
raise MediaNotFoundError(msg)
- async def _parse_radio(
- self, details: dict, stream: dict | None = None, folder: str | None = None
+ def _parse_radio(
+ self, details: dict, stream_info: list[dict] | None = None, folder: str | None = None
) -> Radio:
"""Parse Radio object from json obj returned from api."""
if "name" in details:
name = name.split(" | ")[1]
name = name.split(" (")[0]
- if stream is None:
- # custom url (no stream object present)
- url = details["URL"]
- item_id = url
- media_info = await parse_tags(url)
- content_type = ContentType.try_parse(media_info.format)
- bit_rate = media_info.bit_rate
+ if stream_info is not None:
+ # stream info is provided: parse stream objects into provider mappings
+ radio = Radio(
+ item_id=details["preset_id"],
+ provider=self.lookup_key,
+ name=name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=f'{details["preset_id"]}--{stream["media_type"]}',
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(stream["media_type"]),
+ bit_rate=stream.get("bitrate", 128),
+ ),
+ details=stream["url"],
+ available=details.get("is_available", True),
+ )
+ for stream in stream_info
+ },
+ )
else:
- url = stream["url"]
- item_id = f'{details["preset_id"]}--{stream["media_type"]}'
- content_type = ContentType.try_parse(stream["media_type"])
- bit_rate = stream.get("bitrate", 128) # TODO !
+ # custom url (no stream object present)
+ radio = Radio(
+ item_id=details["URL"],
+ provider=self.lookup_key,
+ name=name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=details["URL"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.UNKNOWN,
+ ),
+ details=details["URL"],
+ available=details.get("is_available", True),
+ )
+ },
+ )
- radio = Radio(
- item_id=item_id,
- provider=self.domain,
- name=name,
- provider_mappings={
- ProviderMapping(
- item_id=item_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=content_type,
- bit_rate=bit_rate,
- ),
- details=url,
- available=details.get("is_available", True),
- )
- },
- )
# preset number is used for sorting (not present at stream time)
preset_number = details.get("preset_number", 0)
radio.position = preset_number
]
return radio
+ async def _get_stream_info(self, preset_id: str) -> list[dict]:
+ """Get stream info for a radio station."""
+ cache_key = f"tunein_stream_{preset_id}"
+ if cache := await self.mass.cache.get(cache_key):
+ return cache
+ result = (await self.__get_data("Tune.ashx", id=preset_id))["body"]
+ await self.mass.cache.set(cache_key, result)
+ return result
+
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a radio station."""
if item_id.startswith("http"):
path=item_id,
can_seek=False,
)
- stream_item_id, media_type = item_id.split("--", 1)
- stream_info = await self.__get_data("Tune.ashx", id=stream_item_id)
- for stream in stream_info["body"]:
- if stream["media_type"] != media_type:
+ if "--" in item_id:
+ stream_item_id, media_type = item_id.split("--", 1)
+ else:
+ media_type = None
+ stream_item_id = item_id
+ for stream in await self._get_stream_info(stream_item_id):
+ if media_type and stream["media_type"] != media_type:
continue
return StreamDetails(
provider=self.domain,