# pylint: disable=invalid-name
-SCHEMA_VERSION = 5
+SCHEMA_VERSION = 6
TABLE_PROV_MAPPINGS = "provider_mappings"
TABLE_TRACK_LOUDNESS = "track_loudness"
table: str,
match: dict = None,
db: Optional[Db] = None,
- ) -> List[Mapping]:
+ ) -> int:
"""Get row count for given table/query."""
async with self.get_db(db) as _db:
sql_query = f"SELECT count() FROM {table}"
# delete player_settings table: use generic settings table instead
await db.execute("DROP TABLE IF EXISTS queue_settings")
+ if prev_version < 6:
+ # recreate radio items due to some changes
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}")
+ match = {"media_type": "radio"}
+ if await self.get_count(TABLE_PROV_MAPPINGS, match):
+ await self.delete(TABLE_PROV_MAPPINGS, match, db=db)
+
# create db tables
await self.__create_database_tables(db)
# store current schema version
async def get_provider_item(self, item_id: str, provider_id: str) -> ItemCls:
"""Return item details for the given provider item id."""
if provider_id == "database":
- return await self.get_db_item(item_id)
- provider = self.mass.music.get_provider(provider_id)
- if not provider:
- raise ProviderUnavailableError(f"Provider {provider_id} is not available!")
- item = await provider.get_item(self.media_type, item_id)
+ item = await self.get_db_item(item_id)
+ else:
+ provider = self.mass.music.get_provider(provider_id)
+ if not provider:
+ raise ProviderUnavailableError(
+ f"Provider {provider_id} is not available!"
+ )
+ item = await provider.get_item(self.media_type, item_id)
if not item:
raise MediaNotFoundError(
f"{self.media_type.value} {item_id} not found on provider {provider_id}"
from asyncio_throttle import Throttler
from music_assistant.helpers.cache import use_cache
+from music_assistant.helpers.util import create_sort_name
from music_assistant.models.media_items import (
ContentType,
ImageType,
async def get_library_radios(self) -> List[Radio]:
"""Retrieve library/subscribed radio stations from the provider."""
- async def parse_api_response(resp: dict, folder: str = None) -> List[Radio]:
+ async def parse_items(items: List[dict], folder: str = None) -> List[Radio]:
result = []
- if not resp or "body" not in resp:
- return result
- for item in resp["body"]:
+ for item in items:
item_type = item.get("type", "")
if item_type == "audio":
+ if "preset_id" not in item:
+ continue
# each radio station can have multiple streams add each one as different quality
- stream_info = await self._get_data(
+ stream_info = await self.__get_data(
"Tune.ashx", id=item["preset_id"]
)
for stream in stream_info["body"]:
result.append(await self._parse_radio(item, stream, folder))
elif item_type == "link":
- # stations are in sublevel
- sublevel = await self._get_data(item["URL"], render="json")
- result += await parse_api_response(sublevel, item["text"])
+ # stations are in sublevel (new style)
+ if sublevel := await self.__get_data(item["URL"], render="json"):
+ result += await parse_items(sublevel["body"], item["text"])
+ elif item.get("children"):
+ # stations are in sublevel (old style ?)
+ result += await parse_items(item["children"], item["text"])
return result
- data = await self._get_data("Browse.ashx", c="presets")
- items = await parse_api_response(data)
- return items
+ data = await self.__get_data("Browse.ashx", c="presets")
+ if data and "body" in data:
+ return await parse_items(data["body"])
+ return []
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get radio station details."""
prov_radio_id, media_type = prov_radio_id.split("--", 1)
params = {"c": "composite", "detail": "listing", "id": prov_radio_id}
- result = await self._get_data("Describe.ashx", **params)
+ result = await self.__get_data("Describe.ashx", **params)
if result and result.get("body") and result["body"][0].get("children"):
item = result["body"][0]["children"][0]
- stream_info = await self._get_data("Tune.ashx", id=prov_radio_id)
+ stream_info = await self.__get_data("Tune.ashx", id=prov_radio_id)
for stream in stream_info["body"]:
if stream["media_type"] != media_type:
continue
details=stream["url"],
)
)
- if folder:
+ # preset number is used for sorting (not present at stream time)
+ preset_number = details.get("preset_number")
+ if preset_number and folder:
radio.sort_name = f'{folder}-{details["preset_number"]}'
- else:
+ elif preset_number:
radio.sort_name = details["preset_number"]
- radio.metadata.description = details["text"]
+ radio.sort_name += create_sort_name(name)
+ if "text" in details:
+ radio.metadata.description = details["text"]
# images
if img := details.get("image"):
radio.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a radio station."""
item_id, media_type = item_id.split("--", 1)
- stream_info = await self._get_data("Tune.ashx", id=item_id)
+ stream_info = await self.__get_data("Tune.ashx", id=item_id)
for stream in stream_info["body"]:
if stream["media_type"] == media_type:
return StreamDetails(
return None
@use_cache(3600 * 2)
- async def _get_data(self, endpoint: str, **kwargs):
+ async def __get_data(self, endpoint: str, **kwargs):
"""Get data from api."""
if endpoint.startswith("http"):
url = endpoint