from .process import AsyncProcess, check_output, communicate
from .tags import parse_tags
from .throttle_retry import BYPASS_THROTTLER
-from .util import TimedAsyncGenerator, create_tempfile
+from .util import TimedAsyncGenerator, create_tempfile, detect_charset
if TYPE_CHECKING:
from music_assistant.common.models.player_queue import QueueItem
substream_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
resp.raise_for_status()
- charset = resp.charset or "utf-8"
- substream_m3u_data = await resp.text(charset)
+ raw_data = await resp.read()
+ encoding = resp.charset or await detect_charset(raw_data)
+ substream_m3u_data = raw_data.decode(encoding)
# get chunk-parts from the substream
hls_chunks = parse_m3u(substream_m3u_data)
chunk_seconds = 0
url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
) as resp:
resp.raise_for_status()
- charset = resp.charset or "utf-8"
- master_m3u_data = await resp.text(charset)
+ raw_data = await resp.read()
+ encoding = resp.charset or await detect_charset(raw_data)
+ master_m3u_data = raw_data.decode(encoding)
substreams = parse_m3u(master_m3u_data)
if any(x for x in substreams if x.length and not x.key):
# this is already a substream!
from aiohttp import client_exceptions
from music_assistant.common.models.errors import InvalidDataError
+from music_assistant.server.helpers.util import detect_charset
if TYPE_CHECKING:
from music_assistant.server import MusicAssistant
"""Parse an online m3u or pls playlist."""
try:
async with mass.http_session.get(url, allow_redirects=True, timeout=5) as resp:
- charset = resp.charset or "utf-8"
try:
- playlist_data = (await resp.content.read(64 * 1024)).decode(charset)
- except ValueError as err:
+ raw_data = await resp.content.read(64 * 1024)
+ # NOTE: using resp.charset is not reliable, we need to detect it ourselves
+ encoding = resp.charset or await detect_charset(raw_data)
+ playlist_data = raw_data.decode(encoding, errors="replace")
+ except (ValueError, UnicodeDecodeError) as err:
msg = f"Could not decode playlist {url}"
raise InvalidDataError(msg) from err
except TimeoutError as err:
from types import TracebackType
from typing import TYPE_CHECKING, Any, ParamSpec, Self, TypeVar
+import cchardet as chardet
import ifaddr
import memory_tempfile
from zeroconf import IPVersion
await agen.aclose()
+async def detect_charset(data: bytes, fallback="utf-8") -> str:
+ """Detect charset of raw data."""
+ try:
+ detected = await asyncio.to_thread(chardet.detect, data)
+ if detected and detected["encoding"] and detected["confidence"] > 0.75:
+ return detected["encoding"]
+ except Exception as err:
+ LOGGER.debug("Failed to detect charset: %s", err)
+ return fallback
+
+
class TaskManager:
"""
Helper class to run many tasks at once.