queue = self._queues[queue_id]
queue_items = self._queue_items[queue_id]
resume_item = queue.current_item
- next_item = queue.next_item
resume_pos = queue.elapsed_time
- if (
- resume_item
- and next_item
- and resume_item.duration
- and resume_pos > (resume_item.duration * 0.9)
- ):
- # track is already played for > 90% - skip to next
- resume_item = next_item
- resume_pos = 0
- elif not resume_item and queue.current_index is not None and len(queue_items) > 0:
+
+ if not resume_item and queue.current_index is not None and len(queue_items) > 0:
resume_item = self.get_item(queue_id, queue.current_index)
resume_pos = 0
elif not resume_item and queue.current_index is None and len(queue_items) > 0:
async def check_audio_support() -> tuple[bool, bool, str]:
"""Check if ffmpeg is present (with/without libsoxr support)."""
# check for FFmpeg presence
- returncode, output = await check_output(["ffmpeg", "-version"])
+ returncode, output = await check_output("ffmpeg", "-version")
ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode()
# use globals as in-memory cache
return self._returncode
-async def check_output(args: str | list[str]) -> tuple[int, bytes]:
+async def check_output(*args: str) -> tuple[int, bytes]:
"""Run subprocess and return returncode and output."""
- if isinstance(args, str):
- proc = await asyncio.create_subprocess_shell(
- args,
- stderr=asyncio.subprocess.STDOUT,
- stdout=asyncio.subprocess.PIPE,
- )
- else:
- proc = await asyncio.create_subprocess_exec(
- *args,
- stderr=asyncio.subprocess.STDOUT,
- stdout=asyncio.subprocess.PIPE,
- )
+ proc = await asyncio.create_subprocess_exec(
+ *args,
+ stderr=asyncio.subprocess.STDOUT,
+ stdout=asyncio.subprocess.PIPE,
+ )
stdout, _ = await proc.communicate()
return (proc.returncode, stdout)
async def communicate(
- args: str | list[str],
+ args: list[str],
input: bytes | None = None, # noqa: A002
) -> tuple[int, bytes, bytes]:
"""Communicate with subprocess and return returncode, stdout and stderr output."""
- if isinstance(args, str):
- proc = await asyncio.create_subprocess_shell(
- args,
- stderr=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE if input is not None else None,
- )
- else:
- proc = await asyncio.create_subprocess_exec(
- *args,
- stderr=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE if input is not None else None,
- )
+ proc = await asyncio.create_subprocess_exec(
+ *args,
+ stderr=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ stdin=asyncio.subprocess.PIPE if input is not None else None,
+ )
stdout, stderr = await proc.communicate(input)
return (proc.returncode, stdout, stderr)
"""Install package with pip, raise when install failed."""
LOGGER.debug("Installing python package %s", package)
args = ["pip", "install", "--find-links", HA_WHEELS, package]
- return_code, output = await check_output(args)
+ return_code, output = await check_output(*args)
if return_code != 0:
msg = f"Failed to install package {package}\n{output.decode()}"
async def check_binary(cliraop_path: str) -> str | None:
try:
returncode, output = await check_output(
- [cliraop_path, "-check"],
+ cliraop_path,
+ "-check",
)
if returncode == 0 and output.strip().decode() == "cliraop check":
self.cliraop_bin = cliraop_path
[m.replace(password, "########") if password else m for m in mount_cmd],
)
- returncode, output = await check_output(mount_cmd)
+ returncode, output = await check_output(*mount_cmd)
if returncode != 0:
msg = f"SMB mount failed with error: {output.decode()}"
raise LoginFailed(msg)
async def unmount(self, ignore_error: bool = False) -> None:
"""Unmount the remote share."""
- returncode, output = await check_output(["umount", self.base_path])
+ returncode, output = await check_output("umount", self.base_path)
if returncode != 0 and not ignore_error:
self.logger.warning("SMB unmount failed with error: %s", output.decode())
return result
- @use_cache(86400 * 7)
async def browse(self, path: str, offset: int, limit: int) -> list[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
"""
+ if offset != 0:
+ # paging is broken on RadioBrowser, we just return some big lists
+ return []
subpath = path.split("://", 1)[1]
subsubpath = "" if "/" not in subpath else subpath.split("/")[-1]
]
if subpath == "popular":
- return await self.get_by_popularity(limit=limit, offset=offset)
+ return await self.get_by_popularity()
if subpath == "tag":
tags = await self.radios.tags(
hide_broken=True,
- limit=limit,
- offset=offset,
order=Order.STATION_COUNT,
reverse=True,
)
if subpath == "country":
items: list[BrowseFolder | Radio] = []
- for country in await self.radios.countries(
- order=Order.NAME, hide_broken=True, limit=limit, offset=offset
- ):
+ for country in await self.radios.countries(order=Order.NAME, hide_broken=True):
folder = BrowseFolder(
item_id=country.code.lower(),
provider=self.domain,
items.append(folder)
return items
- if subsubpath in await self.get_tag_names(limit=limit, offset=offset):
+ if subsubpath in await self.get_tag_names():
return await self.get_by_tag(subsubpath)
- if subsubpath in await self.get_country_codes(limit=limit, offset=offset):
+ if subsubpath in await self.get_country_codes():
return await self.get_by_country(subsubpath)
return []
- async def get_tag_names(self, limit: int, offset: int):
+ @use_cache(3600 * 24)
+ async def get_tag_names(self):
"""Get a list of tag names."""
tags = await self.radios.tags(
hide_broken=True,
- limit=limit,
- offset=offset,
+ limit=10000,
order=Order.STATION_COUNT,
reverse=True,
)
tag_names.append(tag.name.lower())
return tag_names
- async def get_country_codes(self, limit: int, offset: int):
+ @use_cache(3600 * 24)
+ async def get_country_codes(self):
"""Get a list of country names."""
- countries = await self.radios.countries(
- order=Order.NAME, hide_broken=True, limit=limit, offset=offset
- )
+ countries = await self.radios.countries(order=Order.NAME, hide_broken=True)
country_codes = []
for country in countries:
country_codes.append(country.code.lower())
return country_codes
- async def get_by_popularity(self, limit: int, offset: int):
+ @use_cache(3600)
+ async def get_by_popularity(self):
"""Get radio stations by popularity."""
stations = await self.radios.stations(
hide_broken=True,
- limit=limit,
- offset=offset,
+ limit=5000,
order=Order.CLICK_COUNT,
reverse=True,
)
items.append(await self._parse_radio(station))
return items
+ @use_cache(3600)
async def get_by_tag(self, tag: str):
"""Get radio stations by tag."""
items = []
items.append(await self._parse_radio(station))
return items
+ @use_cache(3600)
async def get_by_country(self, country_code: str):
"""Get radio stations by country."""
items = []
action: [optional] action key called from config entries UI.
values: the (intermediate) raw values for config entries sent with the action.
"""
- returncode, output = await check_output(["snapserver", "-v"])
+ returncode, output = await check_output("snapserver", "-v")
snapserver_version = int(output.decode().split(".")[1]) if returncode == 0 else -1
local_snapserver_present = snapserver_version >= 27
if returncode == 0 and not local_snapserver_present:
]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- _returncode, output = await check_output(args)
+ _returncode, output = await check_output(*args)
if _returncode == 0 and output.decode().strip() != "authorized":
raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}")
# get token with (authorized) librespot
]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- _returncode, output = await check_output(args)
+ _returncode, output = await check_output(*args)
duration = round(time.time() - time_start, 2)
try:
result = json.loads(output)
async def check_librespot(librespot_path: str) -> str | None:
try:
- returncode, output = await check_output([librespot_path, "--check"])
+ returncode, output = await check_output(librespot_path, "--check")
if returncode == 0 and b"ok spotty" in output and b"using librespot" in output:
self._librespot_bin = librespot_path
return librespot_path