import os
import time
from collections import OrderedDict
-from collections.abc import Iterator, MutableMapping
-from typing import TYPE_CHECKING, Any
+from collections.abc import Callable, Iterator, MutableMapping
+from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar
from music_assistant.common.helpers.json import json_dumps, json_loads
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
self.mass.loop.call_later(3600, self.__schedule_cleanup_task)
-def use_cache(expiration=86400 * 30):
+Param = ParamSpec("Param")
+RetType = TypeVar("RetType")
+
+
+def use_cache(
+ expiration: int = 86400 * 30,
+) -> Callable[[Callable[Param, RetType]], Callable[Param, RetType]]:
"""Return decorator that can be used to cache a method's result."""
- def wrapper(func):
+ def wrapper(func: Callable[Param, RetType]) -> Callable[Param, RetType]:
@functools.wraps(func)
- async def wrapped(*args, **kwargs):
+ async def wrapped(*args: Param.args, **kwargs: Param.kwargs):
method_class = args[0]
method_class_name = method_class.__class__.__name__
cache_key_parts = [method_class_name, func.__name__]
from __future__ import annotations
+from collections.abc import Sequence
from typing import TYPE_CHECKING
-from radios import FilterBy, Order, RadioBrowser, RadioBrowserError
+from radios import FilterBy, Order, RadioBrowser, RadioBrowserError, Station
from music_assistant.common.models.enums import LinkType, ProviderFeature, StreamType
+from music_assistant.common.models.errors import MediaNotFoundError
from music_assistant.common.models.media_items import (
AudioFormat,
BrowseFolder,
ProviderMapping,
Radio,
SearchResults,
+ UniqueList,
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.controllers.cache import use_cache
self.logger.exception("%s", err)
async def search(
- self, search_query: str, media_types=list[MediaType], limit: int = 10
+ self, search_query: str, media_types: list[MediaType], limit: int = 10
) -> SearchResults:
"""Perform search on musicprovider.
return result
- async def browse(self, path: str, offset: int, limit: int) -> list[MediaItemType]:
+ async def browse(self, path: str, offset: int, limit: int) -> Sequence[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
path=path + "/" + country.code.lower(),
name=country.name,
)
- folder.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=country.favicon,
- provider=self.instance_id,
- remotely_accessible=True,
- )
- ]
+ folder.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=country.favicon,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
items.append(folder)
return items
return []
@use_cache(3600 * 24)
- async def get_tag_names(self):
+ async def get_tag_names(self) -> Sequence[str]:
"""Get a list of tag names."""
tags = await self.radios.tags(
hide_broken=True,
return tag_names
@use_cache(3600 * 24)
- async def get_country_codes(self):
+ async def get_country_codes(self) -> Sequence[str]:
"""Get a list of country names."""
countries = await self.radios.countries(order=Order.NAME, hide_broken=True)
country_codes = []
return country_codes
@use_cache(3600)
- async def get_by_popularity(self):
+ async def get_by_popularity(self) -> Sequence[Radio]:
"""Get radio stations by popularity."""
stations = await self.radios.stations(
hide_broken=True,
return items
@use_cache(3600)
- async def get_by_tag(self, tag: str):
+ async def get_by_tag(self, tag: str) -> Sequence[Radio]:
"""Get radio stations by tag."""
items = []
stations = await self.radios.stations(
return items
@use_cache(3600)
- async def get_by_country(self, country_code: str):
+ async def get_by_country(self, country_code: str) -> list[Radio]:
"""Get radio stations by country."""
items = []
stations = await self.radios.stations(
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get radio station details."""
radio = await self.radios.station(uuid=prov_radio_id)
+ if not radio:
+ raise MediaNotFoundError(f"Radio station {prov_radio_id} not found")
return await self._parse_radio(radio)
- async def _parse_radio(self, radio_obj: dict) -> Radio:
+ async def _parse_radio(self, radio_obj: Station) -> Radio:
"""Parse Radio object from json obj returned from api."""
radio = Radio(
item_id=radio_obj.uuid,
)
},
)
- radio.metadata.label = radio_obj.tags
radio.metadata.popularity = radio_obj.votes
- radio.metadata.links = [MediaItemLink(type=LinkType.WEBSITE, url=radio_obj.homepage)]
- radio.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=radio_obj.favicon,
- provider=self.instance_id,
- remotely_accessible=True,
- )
- ]
+ radio.metadata.links = {MediaItemLink(type=LinkType.WEBSITE, url=radio_obj.homepage)}
+ radio.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=radio_obj.favicon,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
return radio
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a radio station."""
stream = await self.radios.station(uuid=item_id)
+ if not stream:
+ raise MediaNotFoundError(f"Radio station {item_id} not found")
await self.radios.station_click(uuid=item_id)
return StreamDetails(
provider=self.domain,