Fix system lockup caused by SMB Provider (#591)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 28 Mar 2023 20:32:58 +0000 (22:32 +0200)
committerGitHub <noreply@github.com>
Tue, 28 Mar 2023 20:32:58 +0000 (22:32 +0200)
* Fix: SMB Files provider deadlock

* prevent annoying BrokenPipe error

music_assistant/server/helpers/process.py
music_assistant/server/helpers/tags.py
music_assistant/server/helpers/util.py

index e082e5f2f9c577a34fb77db335bfb1961391dc48..164cc44e2f44a0873fa9ace06bf133e7a253b9e2 100644 (file)
@@ -45,6 +45,13 @@ class AsyncProcess:
             stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
             close_fds=True,
         )
+
+        # Fix BrokenPipeError due to a race condition
+        # by attaching a default done callback
+        def _done_cb(fut: asyncio.Future):
+            fut.exception()
+
+        self._proc._transport._protocol._stdin_closed.add_done_callback(_done_cb)
         return self
 
     async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
index 00c2d5828688f06872776ce27c72fe448be1a071..e4fa31e5027e866b21694ad94a78b9a4f9d8c006 100644 (file)
@@ -2,6 +2,7 @@
 from __future__ import annotations
 
 import json
+import logging
 import os
 from collections.abc import AsyncGenerator
 from dataclasses import dataclass
@@ -12,9 +13,11 @@ from music_assistant.common.helpers.util import try_parse_int
 from music_assistant.common.models.enums import AlbumType
 from music_assistant.common.models.errors import InvalidDataError
 from music_assistant.common.models.media_items import MediaItemChapter
-from music_assistant.constants import UNKNOWN_ARTIST
+from music_assistant.constants import ROOT_LOGGER_NAME, UNKNOWN_ARTIST
 from music_assistant.server.helpers.process import AsyncProcess
 
+LOGGER = logging.getLogger(ROOT_LOGGER_NAME).getChild("tags")
+
 # the only multi-item splitter we accept is the semicolon,
 # which is also the default in Musicbrainz Picard.
 # the slash is also a common splitter but causes colissions with
@@ -328,7 +331,8 @@ async def parse_tags(
                             # end of the file
                             # we'll have to read the entire file to do something with it
                             # for now we just ignore/deny these files
-                            raise RuntimeError("Tags not present at beginning of file")
+                            LOGGER.error("Found file with tags not present at beginning of file")
+                            break
                 finally:
                     proc.write_eof()
 
index b35a9a9ac1fd5a1afb506fd741b0e77f24282b80..0ed59a549ef72cecf5656cfb7deca27f989383c2 100644 (file)
@@ -5,6 +5,7 @@ import asyncio
 import importlib
 import logging
 from collections.abc import AsyncGenerator, Iterator
+from contextlib import suppress
 from functools import lru_cache
 from typing import TYPE_CHECKING, Any
 
@@ -45,16 +46,15 @@ async def async_iter(sync_iterator: Iterator, *args, **kwargs) -> AsyncGenerator
     # inspired by: https://stackoverflow.com/questions/62294385/synchronous-generator-in-asyncio
     loop = asyncio.get_running_loop()
     queue = asyncio.Queue(1)
+    _exit = asyncio.Event()
     _end_ = object()
 
     def iter_to_queue():
-        try:
-            for item in sync_iterator(*args, **kwargs):
-                if queue is None:
-                    break
-                asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
-        finally:
-            asyncio.run_coroutine_threadsafe(queue.put(_end_), loop).result()
+        for item in sync_iterator(*args, **kwargs):
+            if _exit.is_set():
+                return
+            asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
+        asyncio.run_coroutine_threadsafe(queue.put(_end_), loop).result()
 
     iter_fut = loop.run_in_executor(None, iter_to_queue)
     try:
@@ -64,6 +64,10 @@ async def async_iter(sync_iterator: Iterator, *args, **kwargs) -> AsyncGenerator
                 break
             yield next_item
     finally:
-        queue = None
+        # cleanup
+        _exit.set()
         if not iter_fut.done():
             iter_fut.cancel()
+            await iter_fut
+        with suppress(asyncio.QueueEmpty):
+            queue.get_nowait()