async def detect_charset(data: bytes, fallback="utf-8") -> str:
"""Detect charset of raw data."""
try:
- return (await asyncio.to_thread(chardet.detect, data))["encoding"]
- except (ImportError, AttributeError):
- return fallback
+ detected = await asyncio.to_thread(chardet.detect, data)
+ if detected and detected["encoding"] and detected["confidence"] > 0.75:
+ return detected["encoding"]
+ except Exception as err:
+ LOGGER.debug("Failed to detect charset: %s", err)
+ return fallback
class TaskManager: