import asyncio
import logging
from collections.abc import AsyncGenerator, Coroutine
+from contextlib import suppress
LOGGER = logging.getLogger(__name__)
await self._proc.communicate()
if self._proc.returncode is None:
self._proc.kill()
+ if self._attached_task and not self._attached_task.done():
+ with suppress(asyncio.CancelledError):
+ self._attached_task.cancel()
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
from functools import lru_cache
from typing import TYPE_CHECKING, Any
+from music_assistant.common.helpers.util import empty_queue
+
if TYPE_CHECKING:
from music_assistant.server.models import ProviderModuleType
return importlib.import_module(f".{domain}", "music_assistant.server.providers")
return await asyncio.to_thread(_get_provider_module, domain)
-
-
-async def async_iter(sync_iterator: Iterator, *args, **kwargs) -> AsyncGenerator[Any, None]:
- """Wrap blocking iterator into an asynchronous one."""
- # inspired by: https://stackoverflow.com/questions/62294385/synchronous-generator-in-asyncio
- loop = asyncio.get_running_loop()
- queue = asyncio.Queue(1)
- _exit = asyncio.Event()
- _end_ = object()
-
- def iter_to_queue():
- for item in sync_iterator(*args, **kwargs):
- if _exit.is_set():
- return
- asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
- asyncio.run_coroutine_threadsafe(queue.put(_end_), loop).result()
-
- iter_fut = loop.run_in_executor(None, iter_to_queue)
- try:
- while True:
- next_item = await queue.get()
- if next_item is _end_:
- break
- yield next_item
- finally:
- # cleanup
- _exit.set()
- if not iter_fut.done():
- iter_fut.cancel()
- await iter_fut
- with suppress(asyncio.QueueEmpty):
- queue.get_nowait()
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import SetupFailedError
from music_assistant.constants import CONF_PATH
-from music_assistant.server.helpers.util import async_iter
from .base import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
"""
abs_path = get_absolute_path(self.config.get_value(CONF_PATH), path)
- async for entry in async_iter(os.scandir, abs_path):
+ self.logger.debug("Processing: %s", abs_path)
+ entries = await asyncio.to_thread(os.scandir, abs_path)
+ for entry in entries:
if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
# skip invalid/system files and dirs
continue
-
item = await create_item(self.config.get_value(CONF_PATH), entry)
if recursive and item.is_dir:
try:
import logging
import os
from collections.abc import AsyncGenerator
+from contextlib import suppress
from os.path import basename
from typing import TYPE_CHECKING
import smbclient
from smbclient import path as smbpath
-from music_assistant.common.helpers.util import get_ip_from_host
+from music_assistant.common.helpers.util import empty_queue, get_ip_from_host
from music_assistant.common.models.config_entries import ConfigEntry
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import LoginFailed
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
from music_assistant.server.controllers.cache import use_cache
-from music_assistant.server.helpers.util import async_iter
from music_assistant.server.providers.filesystem_local.base import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
IGNORE_DIRS,
"""
abs_path = get_absolute_path(self._root_path, path)
- async for entry in async_iter(smbclient.scandir, abs_path):
+ self.logger.debug("Processing: %s", abs_path)
+ entries = await asyncio.to_thread(smbclient.scandir, abs_path)
+ for entry in entries:
if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
# skip invalid/system files and dirs
continue
file_path = file_path.replace("\\", os.sep)
absolute_path = get_absolute_path(self._root_path, file_path)
+ queue = asyncio.Queue(1)
+
def _reader():
self.logger.debug("Reading file contents for %s", absolute_path)
try:
while True:
chunk = _file.read(chunk_size)
if not chunk:
- break
- yield chunk
+ return
+ asyncio.run_coroutine_threadsafe(queue.put(chunk), self.mass.loop).result()
bytes_sent += len(chunk)
finally:
+ asyncio.run_coroutine_threadsafe(queue.put(b""), self.mass.loop).result()
self.logger.debug(
"Finished Reading file contents for %s - bytes transferred: %s",
absolute_path,
bytes_sent,
)
- async for chunk in async_iter(_reader):
- yield chunk
+ try:
+ task = self.mass.create_task(_reader)
+
+ while True:
+ chunk = await queue.get()
+ if not chunk:
+ break
+ yield chunk
+ finally:
+ empty_queue(queue)
+ if task and not task.done():
+ task.cancel()
+ with suppress(asyncio.CancelledError):
+ await task
+ del queue
async def write_file_content(self, file_path: str, data: bytes) -> None:
"""Write entire file content as bytes (e.g. for playlists)."""