self.set(conf_key, value)
@api_command("config/providers/reload")
- async def reload_provider(self, instance_id: str, config: ProviderConfig | None) -> None:
+ async def reload_provider(self, instance_id: str) -> None:
"""Reload provider."""
config = await self.get_provider_config(instance_id)
await self._load_provider_config(config)
data=config,
)
# signal update to the player manager
- with suppress(PlayerUnavailableError, AttributeError):
+ with suppress(PlayerUnavailableError, AttributeError, KeyError):
player = self.mass.players.get(config.player_id)
if config.enabled:
player_prov = self.mass.players.get_player_provider(player_id)
async def setup(self, config: CoreConfig) -> None: # noqa: ARG002
"""Async initialize of module."""
- self.mass.streams.register_dynamic_route("/imageproxy", self._handle_imageproxy)
+ self.mass.streams.register_dynamic_route("/imageproxy", self.handle_imageproxy)
async def close(self) -> None:
"""Handle logic on server stop."""
thumbnail = f"data:image/png;base64,{enc_image}"
return thumbnail
- async def _handle_imageproxy(self, request: web.Request) -> web.Response:
+ async def handle_imageproxy(self, request: web.Request) -> web.Response:
"""Handle request for image proxy."""
path = request.query["path"]
provider = request.query.get("provider", "url")
routes.append(("GET", "/info", self._handle_server_info))
# add websocket api
routes.append(("GET", "/ws", self._handle_ws_client))
+ # also host the image proxy on the webserver
+ routes.append(("GET", "/imageproxy", self.mass.metadata.handle_imageproxy))
# start the webserver
await self._server.setup(
bind_ip=config.get_value(CONF_BIND_IP),
@classmethod
def parse(cls, raw: dict) -> AudioTags:
"""Parse instance from raw ffmpeg info output."""
- audio_stream = next(x for x in raw["streams"] if x["codec_type"] == "audio")
+ audio_stream = next((x for x in raw["streams"] if x["codec_type"] == "audio"), None)
+ if audio_stream is None:
+ raise InvalidDataError("No audio stream found")
has_cover_image = any(x for x in raw["streams"] if x["codec_name"] in ("mjpeg", "png"))
# convert all tag-keys (gathered from all streams) to lowercase without spaces
tags = {}
from music_assistant.common.helpers.util import create_sort_name
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
-from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
+from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
AudioFormat,
ContentType,
if item_type == "audio":
if "preset_id" not in item:
continue
+ if "- Not Supported" in item.get("name", ""):
+ continue
+ if "- Not Supported" in item.get("text", ""):
+ continue
# each radio station can have multiple streams add each one as different quality
stream_info = await self.__get_data("Tune.ashx", id=item["preset_id"])
for stream in stream_info["body"]:
yield await self._parse_radio(item, stream, folder)
elif item_type == "link" and item.get("item") == "url":
# custom url
- yield await self._parse_radio(item)
+ try:
+ yield await self._parse_radio(item)
+ except InvalidDataError as err:
+ # there may be invalid custom urls, ignore those
+ self.logger.warning(str(err))
elif item_type == "link":
# stations are in sublevel (new style)
if sublevel := await self.__get_data(item["URL"], render="json"):