final fix for proper resources cleanup
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 28 Dec 2020 13:44:08 +0000 (14:44 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 28 Dec 2020 13:44:08 +0000 (14:44 +0100)
music_assistant/helpers/process.py
music_assistant/managers/streams.py

index e4bafc539cfaf63dbdbeab8dc5e1c24ec7231129..7c029272caacedf4ead12b3e8435d9ec2b5d46c5 100644 (file)
@@ -30,6 +30,7 @@ class AsyncProcess:
         self, process_args: List, enable_write: bool = False, enable_shell=False
     ):
         """Initialize."""
+        self._id = "".join(process_args)
         self._proc = subprocess.Popen(
             process_args,
             shell=enable_shell,
@@ -49,20 +50,16 @@ class AsyncProcess:
         """Exit context manager."""
         self._cancelled = True
         if self._proc.poll() is None:
-            # prevent subprocess deadlocking, send terminate and read remaining bytes
-            def close_proc():
-                self._proc.terminate()
-                self._proc.stdin.close()
-                self._proc.stdout.read(-1)
+            # process needs to be cleaned up..
+            await self.loop.run_in_executor(None, self.__close)
 
-            await self.loop.run_in_executor(None, close_proc)
         return exc_type not in (GeneratorExit, asyncio.CancelledError)
 
     async def iterate_chunks(
         self, chunksize: int = DEFAULT_CHUNKSIZE
     ) -> AsyncGenerator[bytes, None]:
         """Yield chunks from the process stdout. Generator."""
-        while True:
+        while not self._cancelled:
             chunk = await self.read(chunksize)
             yield chunk
             if len(chunk) < chunksize:
@@ -75,41 +72,17 @@ class AsyncProcess:
             raise asyncio.CancelledError()
         return await self.loop.run_in_executor(None, self.__read, chunksize)
 
-    def __read(self, chunksize: int = DEFAULT_CHUNKSIZE):
-        """Try read chunk from process."""
-        try:
-            return self._proc.stdout.read(chunksize)
-        except (BrokenPipeError, ValueError, AttributeError):
-            # Process already exited
-            return b""
-
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
         if self._cancelled:
             raise asyncio.CancelledError()
-
-        def __write():
-            try:
-                self._proc.stdin.write(data)
-            except (BrokenPipeError, ValueError, AttributeError):
-                # Process already exited
-                pass
-
-        await self.loop.run_in_executor(None, __write)
+        await self.loop.run_in_executor(None, self.__write, data)
 
     async def write_eof(self) -> None:
         """Write eof to process."""
         if self._cancelled:
             raise asyncio.CancelledError()
-
-        def __write_eof():
-            try:
-                self._proc.stdin.close()
-            except (BrokenPipeError, ValueError, AttributeError):
-                # Process already exited
-                pass
-
-        await self.loop.run_in_executor(None, __write_eof)
+        await self.loop.run_in_executor(None, self.__write_eof)
 
     async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
         """Write bytes to process and read back results."""
@@ -117,6 +90,48 @@ class AsyncProcess:
             raise asyncio.CancelledError()
         return await self.loop.run_in_executor(None, self._proc.communicate, input_data)
 
+    def __read(self, chunksize: int = DEFAULT_CHUNKSIZE):
+        """Try read chunk from process."""
+        try:
+            chunk = self._proc.stdout.read(chunksize)
+            self._proc.stdout.flush()
+            return chunk
+        except (BrokenPipeError, ValueError, AttributeError):
+            # Process already exited
+            return b""
+
+    def __write(self, data: bytes):
+        """Write data to process stdin."""
+        try:
+            self._proc.stdin.write(data)
+        except (BrokenPipeError, ValueError, AttributeError):
+            # Process already exited
+            pass
+
+    def __write_eof(self):
+        """Write eof to process stdin."""
+        try:
+            self._proc.stdin.close()
+        except (BrokenPipeError, ValueError, AttributeError):
+            # Process already exited
+            pass
+
+    def __close(self):
+        """Prevent subprocess deadlocking, make sure it closes."""
+        LOGGER.debug("Cleaning up process %s...", self._id)
+        try:
+            self._proc.stdout.close()
+        except BrokenPipeError:
+            pass
+        if self._proc.stdin:
+            try:
+                self._proc.stdin.close()
+            except BrokenPipeError:
+                pass
+        self._proc.kill()
+        self._proc.wait()
+        LOGGER.debug("Process %s closed.", self._id)
+
 
 class AsyncProcessBroken:
     """Implementation of a (truly) non blocking subprocess."""
index 627ed0b173347a5ad1acc08dbaf0c14bb1cbe243..202dd5f935d0d14c390626c15c331b6811c562a6 100755 (executable)
@@ -139,8 +139,7 @@ class StreamManager:
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())
 
             # start yielding audio chunks
-            chunk_size = sample_rate * 4 * 2 * 10
-            async for chunk in sox_proc.iterate_chunks(chunk_size):
+            async for chunk in sox_proc.iterate_chunks():
                 yield chunk
             await asyncio.wait([fill_buffer_task])