stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
close_fds=True,
)
+
+ # Fix BrokenPipeError due to a race condition
+ # by attaching a default done callback
+ def _done_cb(fut: asyncio.Future):
+ fut.exception()
+
+ self._proc._transport._protocol._stdin_closed.add_done_callback(_done_cb)
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
from __future__ import annotations
import json
+import logging
import os
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from music_assistant.common.models.enums import AlbumType
from music_assistant.common.models.errors import InvalidDataError
from music_assistant.common.models.media_items import MediaItemChapter
-from music_assistant.constants import UNKNOWN_ARTIST
+from music_assistant.constants import ROOT_LOGGER_NAME, UNKNOWN_ARTIST
from music_assistant.server.helpers.process import AsyncProcess
+LOGGER = logging.getLogger(ROOT_LOGGER_NAME).getChild("tags")
+
# the only multi-item splitter we accept is the semicolon,
# which is also the default in Musicbrainz Picard.
# the slash is also a common splitter but causes colissions with
# end of the file
# we'll have to read the entire file to do something with it
# for now we just ignore/deny these files
- raise RuntimeError("Tags not present at beginning of file")
+ LOGGER.error("Found file with tags not present at beginning of file")
+ break
finally:
proc.write_eof()
import importlib
import logging
from collections.abc import AsyncGenerator, Iterator
+from contextlib import suppress
from functools import lru_cache
from typing import TYPE_CHECKING, Any
# inspired by: https://stackoverflow.com/questions/62294385/synchronous-generator-in-asyncio
loop = asyncio.get_running_loop()
queue = asyncio.Queue(1)
+ _exit = asyncio.Event()
_end_ = object()
def iter_to_queue():
- try:
- for item in sync_iterator(*args, **kwargs):
- if queue is None:
- break
- asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
- finally:
- asyncio.run_coroutine_threadsafe(queue.put(_end_), loop).result()
+ for item in sync_iterator(*args, **kwargs):
+ if _exit.is_set():
+ return
+ asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
+ asyncio.run_coroutine_threadsafe(queue.put(_end_), loop).result()
iter_fut = loop.run_in_executor(None, iter_to_queue)
try:
break
yield next_item
finally:
- queue = None
+ # cleanup
+ _exit.set()
if not iter_fut.done():
iter_fut.cancel()
+ await iter_fut
+ with suppress(asyncio.QueueEmpty):
+ queue.get_nowait()