API_BASE_URL = "api.audioaddict.com/v1"
API_TIMEOUT = 30
CACHE_CHANNELS = 86400 # 24 hours
+CACHE_GENRES = 86400 # 24 hours
CACHE_STREAM_URL = 3600 # 1 hour
# Rate limiting
# Validation constants
MIN_LISTEN_KEY_LENGTH = 10
-HTTPS_SCHEME_PREFIX = "//"
# Digitally Incorporated radio services configuration
NETWORKS = {
self.logger.debug(
"%s: Found %d channels for network %s", self.domain, len(channels), network_key
)
- radio_items: list[MediaItemType | BrowseFolder] = [
- self._channel_to_radio(ch, network_key) for ch in channels
- ]
+ radio_items: list[MediaItemType | BrowseFolder] = []
+ for ch in channels:
+ radio = self._channel_to_radio(ch, network_key)
+ radio_items.append(radio)
self.logger.debug("%s: Converted to %d radio items", self.domain, len(radio_items))
return radio_items
except (
@use_cache(CACHE_CHANNELS)
async def _get_channels(self, network_key: str) -> list[dict[str, Any]]:
- """Get listenable channels for a specific network (optimized single call)."""
+ """Get channels for a specific network with enriched genre data."""
try:
- # Get only listenable channels directly - no need for two API calls
- channels_response = await self._api_request(network_key, "listen/channels")
+ # Get all channel data (includes images, descriptions, etc.)
+ channels_response = await self._api_request(network_key, "channels")
if not channels_response or not isinstance(channels_response, list):
self.logger.warning("No channels returned for network %s", network_key)
return []
- # Ensure all items are dictionaries
- channels: list[dict[str, Any]] = [
- ch for ch in channels_response if isinstance(ch, dict)
- ]
+ self.logger.debug(
+ "%s: Retrieved %d channels for network %s",
+ self.domain,
+ len(channels_response),
+ network_key,
+ )
+
+ # Get genre filters and create a mapping
+ genre_mapping = await self._get_genre_mapping(network_key)
+
+ # Filter and enrich channels with genre data
+ channels: list[dict[str, Any]] = []
+ for ch in channels_response:
+ if not isinstance(ch, dict) or self._is_disabled_channel(ch):
+ continue
+
+ # Enrich channel with genre names
+ channel_filter_ids = ch.get("channel_filter_ids", [])
+ if channel_filter_ids and genre_mapping:
+ genres = [
+ genre_mapping[filter_id]
+ for filter_id in channel_filter_ids
+ if filter_id in genre_mapping
+ ]
+ if genres:
+ ch["genres"] = genres
+
+ channels.append(ch)
+
+ self.logger.debug(
+ "%s: Processed %d channels for network %s",
+ self.domain,
+ len(channels),
+ network_key,
+ )
+
return channels
except (ProviderUnavailableError, MediaNotFoundError, aiohttp.ClientError) as err:
self.logger.error("Failed to get channels for network %s: %s", network_key, err)
raise
+ @use_cache(CACHE_GENRES)
+ async def _get_channel_filters(self, network_key: str) -> list[dict[str, Any]]:
+ """Get channel filters (genre information) for a specific network."""
+ try:
+ # Get genre/filter data
+ filters_response = await self._api_request(network_key, "channel_filters")
+
+ if not filters_response or not isinstance(filters_response, list):
+ self.logger.warning("No channel filters returned for network %s", network_key)
+ return []
+
+ self.logger.debug(
+ "%s: Retrieved %d channel filters for network %s",
+ self.domain,
+ len(filters_response),
+ network_key,
+ )
+
+ # Ensure all items are dictionaries and filter for actual genres
+ # (genre=True indicates genre categories vs meta categories e.g. "Favorite" or "All")
+ genre_filters: list[dict[str, Any]] = [
+ f for f in filters_response if isinstance(f, dict) and f.get("genre", False)
+ ]
+
+ self.logger.debug(
+ "%s: Found %d genre filters (out of %d total filters) for network %s",
+ self.domain,
+ len(genre_filters),
+ len(filters_response),
+ network_key,
+ )
+
+ return genre_filters
+
+ except (ProviderUnavailableError, MediaNotFoundError, aiohttp.ClientError) as err:
+ self.logger.error("Failed to get channel filters for network %s: %s", network_key, err)
+ raise
+
+ async def _get_genre_mapping(self, network_key: str) -> dict[int, str]:
+ """Get a mapping of filter ID to genre name for a network."""
+ try:
+ genre_filters = await self._get_channel_filters(network_key)
+
+ # Create a mapping of filter ID to genre name
+ mapping = {
+ f["id"]: f["name"]
+ for f in genre_filters
+ if isinstance(f, dict) and "id" in f and "name" in f
+ }
+
+ self.logger.debug(
+ "%s: Created genre mapping with %d entries for network %s",
+ self.domain,
+ len(mapping),
+ network_key,
+ )
+
+ return mapping
+
+ except (ProviderUnavailableError, MediaNotFoundError, aiohttp.ClientError) as err:
+ self.logger.warning(
+ "%s: Failed to get genre mapping for network %s: %s",
+ self.domain,
+ network_key,
+ err,
+ )
+ return {}
+
@use_cache(CACHE_STREAM_URL)
async def _get_stream_url(self, network_key: str, channel_key: str) -> str:
"""Get the streaming URL for a channel."""
prov_id = f"{network_key}:{channel_key}"
channel_name = str(channel_data.get("name", "Unknown"))
- network_info = NETWORKS[network_key]
- # Create metadata with optional image
+ # Create metadata with optional image and genres
metadata = MediaItemMetadata(
- description=f"{network_info['description']} - {channel_name}",
+ description=self._get_description(channel_data),
explicit=False,
)
+ # Add genre information from enriched channel data
+ genres = channel_data.get("genres", [])
+ if genres:
+ metadata.genres = set(genres)
+ self.logger.debug("%s: Added genres %s for channel %s", self.domain, genres, prov_id)
+
# Process image URL if available
image_url = self._extract_image_url(channel_data)
if image_url:
+ self.logger.debug(
+ "%s: Found image URL for channel %s: %s", self.domain, prov_id, image_url
+ )
metadata.images = UniqueList(
[
MediaItemImage(
)
]
)
+ else:
+ self.logger.debug("%s: No valid image URL found for channel %s", self.domain, prov_id)
return Radio(
item_id=prov_id,
metadata=metadata,
)
+ def _is_disabled_channel(self, channel_data: dict[str, Any]) -> bool:
+ """Check if a channel is disabled based on its name pattern."""
+ name = channel_data.get("name")
+ if not name or not isinstance(name, str) or len(name) < 2:
+ return False
+
+ # Disabled channels have names starting with 'X' followed by a (capitalized) channel name.
+ return bool(name[0] == "X" and name[1].isupper())
+
+ def _get_description(self, channel_data: dict[str, Any]) -> str:
+ """Get combined description from channel data."""
+ short_desc = channel_data.get("description_short", "")
+ long_desc = channel_data.get("description_long", "")
+
+ if not long_desc or long_desc == short_desc:
+ return str(short_desc)
+
+ return f"{short_desc}\n\n{long_desc}"
+
def _extract_image_url(self, channel_data: dict[str, Any]) -> str | None:
"""Extract and normalize image URL from channel data."""
images = channel_data.get("images")
if not images or not isinstance(images, dict):
return None
- image_url = images.get("default")
+ image_url = images.get("square")
if not image_url or not isinstance(image_url, str):
return None
- # Add protocol if missing
- if image_url.startswith(HTTPS_SCHEME_PREFIX):
+ # Add protocol if missing (AudioAddict returns URLs starting with //)
+ if image_url.startswith("//"):
image_url = f"https:{image_url}"
- # Remove template parts if present
+ # Remove template parts if present (URLs may contain {size} placeholders)
return str(image_url.split("{")[0])