final fix for the asyncprocess
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 7 Oct 2020 10:25:39 +0000 (12:25 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 7 Oct 2020 10:25:39 +0000 (12:25 +0200)
music_assistant/helpers/process.py
music_assistant/managers/config.py
music_assistant/managers/music.py
music_assistant/managers/streams.py
music_assistant/mass.py
music_assistant/providers/spotify/__init__.py
music_assistant/web/endpoints/players.py

index d5e857bacd1da39db440b5f2050b5df6a3822bc1..3fddfc2b95e29ab09342f35560bc098ff4f23bc7 100644 (file)
@@ -36,21 +36,17 @@ class AsyncProcess(object):
         self._chunksize = chunksize
         self._enable_write = enable_write
         self._enable_shell = enable_shell
+        self.loop = asyncio.get_running_loop()
+        self.__queue_in = asyncio.Queue(4)
+        self.__queue_out = asyncio.Queue(8)
+        self.__proc_task = None
         self._exit = False
-        self._proc = None
         self._id = int(time.time())  # some identifier for logging
 
     async def __aenter__(self) -> "AsyncProcess":
         """Enter context manager, start running the process in executor."""
         LOGGER.debug("[%s] Entered context manager", self._id)
-        self._proc = subprocess.Popen(
-            self._process_args,
-            **{
-                "shell": self._enable_shell,
-                "stdout": subprocess.PIPE,
-                "stdin": subprocess.PIPE if self._enable_write else None,
-            },
-        )
+        self.__proc_task = self.loop.run_in_executor(None, self.__run_proc)
         return self
 
     async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
@@ -64,168 +60,47 @@ class AsyncProcess(object):
             )
         else:
             LOGGER.debug("[%s] Context manager exit", self._id)
-        # signal that we must exit
-        self._exit = True
-
-        def close_proc():
-            if self._proc and self._proc.poll() is None:
-                # there is no real clean way to do this with all the blocking pipes
-                self._proc.kill()
 
-        await asyncio.get_running_loop().run_in_executor(None, close_proc)
+        self._exit = True
+        # prevent a deadlock by clearing the queues
+        while self.__queue_in.qsize():
+            await self.__queue_in.get()
+            self.__queue_in.task_done()
+        self.__queue_in.put_nowait(b"")
+        while self.__queue_out.qsize():
+            await self.__queue_out.get()
+            self.__queue_out.task_done()
+        await self.__proc_task
         LOGGER.debug("[%s] Cleanup finished", self._id)
         return True
 
     async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
         """Yield chunks from the output Queue. Generator."""
         LOGGER.debug("[%s] start reading from generator", self._id)
-        while not self._exit:
+        while True:
             chunk = await self.read()
             yield chunk
-            if len(chunk) < self._chunksize:
+            if not chunk or len(chunk) < self._chunksize:
                 break
         LOGGER.debug("[%s] finished reading from generator", self._id)
 
     async def read(self) -> bytes:
         """Read single chunk from the output Queue."""
-
-        def try_read():
-            try:
-                data = self._proc.stdout.read(self._chunksize)
-                return data
-            except BrokenPipeError:
-                return b""
-            except Exception as exc:  # pylint: disable=broad-except
-                LOGGER.exception(exc)
-                return b""
-
-        return await asyncio.get_running_loop().run_in_executor(None, try_read)
+        if self._exit:
+            raise RuntimeError("Already exited")
+        data = await self.__queue_out.get()
+        self.__queue_out.task_done()
+        return data
 
     async def write(self, data: bytes) -> None:
         """Write data to process."""
-
-        def try_write(_data):
-            try:
-                self._proc.stdin.write(_data)
-            except BrokenPipeError:
-                pass
-            except Exception as exc:  # pylint: disable=broad-except
-                LOGGER.exception(exc)
-
-        await asyncio.get_running_loop().run_in_executor(None, try_write, data)
+        if self._exit:
+            raise RuntimeError("Already exited")
+        await self.__queue_in.put(data)
 
     async def write_eof(self) -> None:
         """Write eof to process."""
-
-        def try_write():
-            try:
-                self._proc.stdin.close()
-            except BrokenPipeError:
-                pass
-            except Exception as exc:  # pylint: disable=broad-except
-                LOGGER.exception(exc)
-
-        await asyncio.get_running_loop().run_in_executor(None, try_write)
-
-    async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
-        """Write bytes to process and read back results."""
-        if not self._enable_write and input_data:
-            raise RuntimeError("Write is disabled")
-        if input_data:
-            await self.write(input_data)
-        output = b""
-        async for chunk in self.iterate_chunks():
-            output += chunk
-        return output
-
-
-# first attempt with queues, too complicated
-# left here as reference
-class AsyncProcessWithQueues(object):
-    """Implementation of a (truly) non blocking subprocess."""
-
-    def __init__(
-        self,
-        process_args: List,
-        chunksize=512000,
-        enable_write: bool = False,
-        enable_shell=False,
-    ):
-        """Initialize."""
-        self._process_args = process_args
-        self._chunksize = chunksize
-        self._enable_write = enable_write
-        self._enable_shell = enable_shell
-        # we have large chunks, limit the queue size a bit.
-        import janus
-
-        self.__queue_in = janus.Queue(8)
-        self.__queue_out = janus.Queue(4)
-        self.__proc_task = None
-        self._exit = threading.Event()
-        self._id = int(time.time())  # some identifier for logging
-
-    async def __aenter__(self) -> "AsyncProcess":
-        """Enter context manager, start running the process in executor."""
-        LOGGER.debug("[%s] Entered context manager", self._id)
-        self.__proc_task = asyncio.get_running_loop().run_in_executor(None, self._run)
-        return self
-
-    async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
-        """Exit context manager."""
-        if exc_type:
-            LOGGER.debug(
-                "[%s] Context manager exit with exception %s (%s)",
-                self._id,
-                exc_type,
-                str(exc_value),
-            )
-        else:
-            LOGGER.debug("[%s] Context manager exit", self._id)
-        # signal that we must exit
-        self._exit.set()
-        # if self._proc and self._proc.poll() is None:
-        #     asyncio.get_running_loop().run_in_executor(None, self._proc.communicate)
-        print("1")
-        self.__queue_out.close()
-        self.__queue_in.close()
-        print("2")
-        await self.__queue_out.wait_closed()
-        await self.__queue_in.wait_closed()
-        print("3")
-        # await executor job
-        self.__proc_task.cancel()
-        # await self.__proc_task
-        print("4")
-
-        LOGGER.debug("[%s] Cleanup finished", self._id)
-        return True
-
-    async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
-        """Yield chunks from the output Queue. Generator."""
-        LOGGER.debug("[%s] start reading from generator", self._id)
-        while not self._exit.is_set():
-            chunk = await self.__queue_out.async_q.get()
-            self.__queue_out.async_q.task_done()
-            if not chunk:
-                break
-            yield chunk
-        LOGGER.debug("[%s] finished reading from generator", self._id)
-
-    async def read(self) -> bytes:
-        """Read single chunk from the output Queue."""
-        chunk = await self.__queue_out.async_q.get()
-        self.__queue_out.async_q.task_done()
-        return chunk
-
-    async def write(self, data: Optional[bytes] = None) -> None:
-        """Write data to process."""
-        if not self._exit.is_set():
-            await self.__queue_in.async_q.put(data)
-
-    async def write_eof(self) -> None:
-        """Write eof to process stdin."""
-        await self.write(b"")
+        await self.__queue_in.put(b"")
 
     async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
         """Write bytes to process and read back results."""
@@ -239,54 +114,75 @@ class AsyncProcessWithQueues(object):
             output += chunk
         return output
 
-    def _run(self):
-        """Run actual process in executor thread."""
-        LOGGER.info(
-            "[%s] Starting process with args: %s", self._id, str(self._process_args)
-        )
-        proc = subprocess.Popen(
-            self._process_args,
-            **{
-                "shell": self._enable_shell,
-                "stdout": subprocess.PIPE,
-                "stdin": subprocess.PIPE if self._enable_write else None,
-            },
-        )
-
-        # start fill buffer task in (yet another) background thread
-        def fill_buffer():
-            LOGGER.debug("[%s] start fill buffer", self._id)
-            try:
-                while not self._exit.is_set() and not self.__queue_in.closed:
-                    chunk = self.__queue_in.sync_q.get()
-                    if not chunk:
-                        break
-                    proc.stdin.write(chunk)
-            except Exception as exc:  # pylint: disable=broad-except
-                LOGGER.debug("[%s], fill buffer aborted (%s)", self._id, str(exc))
-            else:
-                LOGGER.debug("[%s] fill buffer finished", self._id)
-
-        if self._enable_write:
-            fill_buffer_thread = threading.Thread(
-                target=fill_buffer, name=f"AsyncProcess_{self._id}"
-            )
-            fill_buffer_thread.start()
-
-        # consume bytes from stdout
+    def __run_proc(self):
+        """Run process in executor."""
         try:
-            while not self._exit.is_set() and not self.__queue_out.closed:
-                chunk = proc.stdout.read(self._chunksize)
-                self.__queue_out.sync_q.put(chunk)
-                if len(chunk) < self._chunksize:
-                    LOGGER.debug("[%s] last chunk received on stdout", self._id)
-                    break
+            LOGGER.info(
+                "[%s] Starting process with args: %s", self._id, str(self._process_args)
+            )
+            proc = subprocess.Popen(
+                self._process_args,
+                shell=self._enable_shell,
+                stdout=subprocess.PIPE,
+                stdin=subprocess.PIPE if self._enable_write else None,
+            )
             if self._enable_write:
-                fill_buffer_thread.join()
-            # write empty chunk to out queue to indicate end of stream just in case
-            self.__queue_out.sync_q.put(b"")
+                threading.Thread(
+                    target=self.__write_stdin,
+                    args=(proc.stdin,),
+                    name=f"AsyncProcess_{self._id}_write_stdin",
+                    daemon=True,
+                ).start()
+            threading.Thread(
+                target=self.__read_stdout,
+                args=(proc.stdout,),
+                name=f"AsyncProcess_{self._id}_read_stdout",
+                daemon=True,
+            ).start()
+            proc.wait()
+
+        except Exception as exc:  # pylint: disable=broad-except
+            LOGGER.exception(exc)
         finally:
-            LOGGER.info("[%s] wait for process exit", self._id)
-            # pickup remaining bytes if process is stull running
+            LOGGER.error("[%s] process exiting", self._id)
             if proc.poll() is None:
+                proc.terminate()
                 proc.communicate()
+            LOGGER.debug("[%s] process finished", self._id)
+
+    def __write_stdin(self, _stdin):
+        """Put chunks from queue to stdin."""
+        LOGGER.debug("[%s] start write_stdin", self._id)
+        try:
+            while True:
+                chunk = asyncio.run_coroutine_threadsafe(
+                    self.__queue_in.get(), self.loop
+                ).result()
+                self.__queue_in.task_done()
+                if not chunk:
+                    _stdin.close()
+                    break
+                _stdin.write(chunk)
+        except Exception as exc:  # pylint: disable=broad-except
+            LOGGER.debug("[%s] write_stdin aborted (%s)", self._id, str(exc))
+        else:
+            LOGGER.debug("[%s] write_stdin finished", self._id)
+
+    def __read_stdout(self, _stdout):
+        """Put chunks from stdout to queue."""
+        LOGGER.debug("[%s] start read_stdout", self._id)
+        try:
+            while True:
+                chunk = _stdout.read(self._chunksize)
+                asyncio.run_coroutine_threadsafe(
+                    self.__queue_out.put(chunk), self.loop
+                ).result()
+                if not chunk or len(chunk) < self._chunksize:
+                    LOGGER.debug("[%s] last chunk received on stdout", self._id)
+                    break
+            # write empty chunk just in case
+            asyncio.run_coroutine_threadsafe(self.__queue_out.put(b""), self.loop)
+        except Exception as exc:  # pylint: disable=broad-except
+            LOGGER.debug("[%s] read_stdout aborted (%s)", self._id, str(exc))
+        else:
+            LOGGER.debug("[%s] read_stdout finished", self._id)
index 1e8285edaa3574997ebaeb8d7093895182d9a134..6657991bba482abd7a35192c86217fe8e93332dd 100755 (executable)
@@ -457,7 +457,10 @@ class ProviderSettings(ConfigBaseItem):
     def all_keys(self):
         """Return all possible keys of this Config object."""
         prov_type = PROVIDER_TYPE_MAPPINGS[self.conf_key]
-        return (item.id for item in self.mass.get_providers(prov_type))
+        return (
+            item.id
+            for item in self.mass.get_providers(prov_type, include_unavailable=True)
+        )
 
     def get_config_entries(self, child_key: str) -> List[ConfigEntry]:
         """Return all config entries for the given provider."""
index 49408d83b2c4c589a31369c28766147123cf9a28..0f86b36b6b30af23c55cbd0062bf2a34c4276b65 100755 (executable)
@@ -1115,7 +1115,7 @@ class MusicManager:
             ):
                 # get streamdetails from provider
                 music_prov = self.mass.get_provider(prov_media.provider)
-                if not music_prov:
+                if not music_prov or not music_prov.available:
                     continue  # provider temporary unavailable ?
 
                 streamdetails = await music_prov.async_get_stream_details(
index 9ccdaa05ba796987b8c81f8dcf8ea922b4379c94..b9599bcb728fd8fa060a6565c75c82f6a7e2d51a 100755 (executable)
@@ -93,6 +93,8 @@ class StreamManager:
         )
         async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
 
+            cancelled = False
+
             async def fill_buffer():
                 """Forward audio chunks to sox stdin."""
                 LOGGER.debug(
@@ -102,6 +104,8 @@ class StreamManager:
                 )
                 # feed audio data into sox stdin for processing
                 async for chunk in self.async_get_media_stream(streamdetails):
+                    if self.mass.exit or cancelled:
+                        break
                     await sox_proc.write(chunk)
                 await sox_proc.write_eof()
                 LOGGER.debug(
@@ -113,23 +117,34 @@ class StreamManager:
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())
             # yield chunks from stdout
             # we keep 1 chunk behind to detect end of stream properly
-            prev_chunk = b""
-            async for chunk in sox_proc.iterate_chunks():
-                if len(chunk) < chunk_size:
-                    # last chunk
-                    yield (True, prev_chunk + chunk)
-                    break
-                if prev_chunk:
-                    yield (False, prev_chunk)
-                prev_chunk = chunk
+            try:
+                prev_chunk = b""
+                async for chunk in sox_proc.iterate_chunks():
+                    if len(chunk) < chunk_size:
+                        # last chunk
+                        yield (True, prev_chunk + chunk)
+                        break
+                    if prev_chunk:
+                        yield (False, prev_chunk)
+                    prev_chunk = chunk
 
-            await asyncio.wait([fill_buffer_task])
+                await asyncio.wait([fill_buffer_task])
 
-            LOGGER.debug(
-                "[async_get_sox_stream] [%s/%s] finished",
-                streamdetails.provider,
-                streamdetails.item_id,
-            )
+            except (GeneratorExit, Exception) as exc:  # pylint: disable=broad-except
+                cancelled = True
+                fill_buffer_task.cancel()
+                LOGGER.debug(
+                    "[async_get_sox_stream] [%s/%s] cancelled",
+                    streamdetails.provider,
+                    streamdetails.item_id,
+                )
+                raise exc
+            else:
+                LOGGER.debug(
+                    "[async_get_sox_stream] [%s/%s] finished",
+                    streamdetails.provider,
+                    streamdetails.item_id,
+                )
 
     async def async_queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]:
         """Stream the PlayerQueue's tracks as constant feed in flac format."""
@@ -145,14 +160,16 @@ class StreamManager:
             )
 
             # feed stdin with pcm samples
+            cancelled = False
+
             async def fill_buffer():
                 """Feed audio data into sox stdin for processing."""
                 LOGGER.debug(
                     "[async_queue_stream_flac] [%s] fill buffer started", player_id
                 )
                 async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32):
-                    if self.mass.exit:
-                        return
+                    if self.mass.exit or cancelled:
+                        break
                     await sox_proc.write(chunk)
                 # write eof when no more data
                 await sox_proc.write_eof()
@@ -161,14 +178,24 @@ class StreamManager:
                 )
 
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())
-            # start yielding audio chunks
-            async for chunk in sox_proc.iterate_chunks():
-                yield chunk
-            await asyncio.wait([fill_buffer_task])
-            LOGGER.debug(
-                "[async_queue_stream_flac] [%s] finished",
-                player_id,
-            )
+            try:
+                # start yielding audio chunks
+                async for chunk in sox_proc.iterate_chunks():
+                    yield chunk
+                await asyncio.wait([fill_buffer_task])
+            except (GeneratorExit, Exception) as exc:  # pylint: disable=broad-except
+                cancelled = True
+                fill_buffer_task.cancel()
+                LOGGER.debug(
+                    "[async_queue_stream_flac] [%s] cancelled",
+                    player_id,
+                )
+                raise exc
+            else:
+                LOGGER.debug(
+                    "[async_queue_stream_flac] [%s] finished",
+                    player_id,
+                )
 
     async def async_queue_stream_pcm(
         self, player_id, sample_rate=96000, bit_depth=32
index a087d8878536f6f6b50c4312669b9bc21c2d5a9e..a48d0e08c5c9c8cd4e9ea21a45b962082e562843 100644 (file)
@@ -196,13 +196,16 @@ class MusicAssistant:
 
     @callback
     def get_providers(
-        self, filter_type: Optional[ProviderType] = None
+        self,
+        filter_type: Optional[ProviderType] = None,
+        include_unavailable: bool = False,
     ) -> List[Provider]:
         """Return all providers, optionally filtered by type."""
         return [
             item
             for item in self._providers.values()
-            if (filter_type is None or item.type == filter_type) and item.available
+            if (filter_type is None or item.type == filter_type)
+            and (include_unavailable or item.available)
         ]
 
     @callback
index 2eb0228872d0ff7b89b5fd48ddb7b129ec94dff8..39d225745f26128b51237fe5d8795d6fdbcfe7f7 100644 (file)
@@ -61,6 +61,8 @@ class SpotifyProvider(MusicProvider):
 
     __auth_token = None
     sp_user = None
+    _username = None
+    _password = None
 
     @property
     def id(self) -> str:
index ae5c3b10ef226bc5670f6121093c2a7bd64a4908..bac9e3b307be245f6ce7731a0c7a6d677ab77a30 100644 (file)
@@ -70,6 +70,8 @@ async def async_player_queue_item(request: Request):
     player_id = request.match_info.get("player_id")
     item_id = request.match_info.get("queue_item")
     player_queue = request.app["mass"].players.get_player_queue(player_id)
+    if not player_queue:
+        return Response(text="invalid player", status=404)
     try:
         item_id = int(item_id)
         queue_item = player_queue.get_item(item_id)
@@ -84,6 +86,8 @@ async def async_player_queue_items(request: Request):
     """Return the items in the player's queue."""
     player_id = request.match_info.get("player_id")
     player_queue = request.app["mass"].players.get_player_queue(player_id)
+    if not player_queue:
+        return Response(text="invalid player", status=404)
 
     async def async_queue_tracks_iter():
         for item in player_queue.items:
@@ -98,6 +102,8 @@ async def async_player_queue(request: Request):
     """Return the player queue details."""
     player_id = request.match_info.get("player_id")
     player_queue = request.app["mass"].players.get_player_queue(player_id)
+    if not player_queue:
+        return Response(text="invalid player", status=404)
     return Response(
         body=json_serializer(player_queue.to_dict()), content_type="application/json"
     )