from .process import AsyncProcess, check_output, communicate
from .tags import parse_tags
from .throttle_retry import BYPASS_THROTTLER
-from .util import TimedAsyncGenerator, create_tempfile
+from .util import TimedAsyncGenerator, create_tempfile, detect_charset
if TYPE_CHECKING:
from music_assistant.common.models.player_queue import QueueItem
substream_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
resp.raise_for_status()
- charset = resp.charset or "utf-8"
- substream_m3u_data = await resp.text(charset)
+ raw_data = await resp.read()
+ encoding = resp.charset or await detect_charset(raw_data)
+ substream_m3u_data = raw_data.decode(encoding)
# get chunk-parts from the substream
hls_chunks = parse_m3u(substream_m3u_data)
chunk_seconds = 0
url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
) as resp:
resp.raise_for_status()
- charset = resp.charset or "utf-8"
- master_m3u_data = await resp.text(charset)
+ raw_data = await resp.read()
+ encoding = resp.charset or await detect_charset(raw_data)
+ master_m3u_data = raw_data.decode(encoding)
substreams = parse_m3u(master_m3u_data)
if any(x for x in substreams if x.length and not x.key):
# this is already a substream!
from types import TracebackType
from typing import TYPE_CHECKING, Any, ParamSpec, Self, TypeVar
+import cchardet as chardet
import ifaddr
import memory_tempfile
from zeroconf import IPVersion
await agen.aclose()
+async def detect_charset(data: bytes, fallback="utf-8") -> str:
+ """Detect charset of raw data."""
+ try:
+ return (await asyncio.to_thread(chardet.detect, data))["encoding"]
+ except (ImportError, AttributeError):
+ return fallback
+
+
class TaskManager:
"""
Helper class to run many tasks at once.