From: Marcel van der Veldt Date: Mon, 28 Dec 2020 13:44:08 +0000 (+0100) Subject: final fix for proper resources cleanup X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=8fe0852ec9729974d713de266c2c7d1b602a8a1b;p=music-assistant-server.git final fix for proper resources cleanup --- diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index e4bafc53..7c029272 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -30,6 +30,7 @@ class AsyncProcess: self, process_args: List, enable_write: bool = False, enable_shell=False ): """Initialize.""" + self._id = "".join(process_args) self._proc = subprocess.Popen( process_args, shell=enable_shell, @@ -49,20 +50,16 @@ class AsyncProcess: """Exit context manager.""" self._cancelled = True if self._proc.poll() is None: - # prevent subprocess deadlocking, send terminate and read remaining bytes - def close_proc(): - self._proc.terminate() - self._proc.stdin.close() - self._proc.stdout.read(-1) + # process needs to be cleaned up.. + await self.loop.run_in_executor(None, self.__close) - await self.loop.run_in_executor(None, close_proc) return exc_type not in (GeneratorExit, asyncio.CancelledError) async def iterate_chunks( self, chunksize: int = DEFAULT_CHUNKSIZE ) -> AsyncGenerator[bytes, None]: """Yield chunks from the process stdout. Generator.""" - while True: + while not self._cancelled: chunk = await self.read(chunksize) yield chunk if len(chunk) < chunksize: @@ -75,41 +72,17 @@ class AsyncProcess: raise asyncio.CancelledError() return await self.loop.run_in_executor(None, self.__read, chunksize) - def __read(self, chunksize: int = DEFAULT_CHUNKSIZE): - """Try read chunk from process.""" - try: - return self._proc.stdout.read(chunksize) - except (BrokenPipeError, ValueError, AttributeError): - # Process already exited - return b"" - async def write(self, data: bytes) -> None: """Write data to process stdin.""" if self._cancelled: raise asyncio.CancelledError() - - def __write(): - try: - self._proc.stdin.write(data) - except (BrokenPipeError, ValueError, AttributeError): - # Process already exited - pass - - await self.loop.run_in_executor(None, __write) + await self.loop.run_in_executor(None, self.__write, data) async def write_eof(self) -> None: """Write eof to process.""" if self._cancelled: raise asyncio.CancelledError() - - def __write_eof(): - try: - self._proc.stdin.close() - except (BrokenPipeError, ValueError, AttributeError): - # Process already exited - pass - - await self.loop.run_in_executor(None, __write_eof) + await self.loop.run_in_executor(None, self.__write_eof) async def communicate(self, input_data: Optional[bytes] = None) -> bytes: """Write bytes to process and read back results.""" @@ -117,6 +90,48 @@ class AsyncProcess: raise asyncio.CancelledError() return await self.loop.run_in_executor(None, self._proc.communicate, input_data) + def __read(self, chunksize: int = DEFAULT_CHUNKSIZE): + """Try read chunk from process.""" + try: + chunk = self._proc.stdout.read(chunksize) + self._proc.stdout.flush() + return chunk + except (BrokenPipeError, ValueError, AttributeError): + # Process already exited + return b"" + + def __write(self, data: bytes): + """Write data to process stdin.""" + try: + self._proc.stdin.write(data) + except (BrokenPipeError, ValueError, AttributeError): + # Process already exited + pass + + def __write_eof(self): + """Write eof to process stdin.""" + try: + self._proc.stdin.close() + except (BrokenPipeError, ValueError, AttributeError): + # Process already exited + pass + + def __close(self): + """Prevent subprocess deadlocking, make sure it closes.""" + LOGGER.debug("Cleaning up process %s...", self._id) + try: + self._proc.stdout.close() + except BrokenPipeError: + pass + if self._proc.stdin: + try: + self._proc.stdin.close() + except BrokenPipeError: + pass + self._proc.kill() + self._proc.wait() + LOGGER.debug("Process %s closed.", self._id) + class AsyncProcessBroken: """Implementation of a (truly) non blocking subprocess.""" diff --git a/music_assistant/managers/streams.py b/music_assistant/managers/streams.py index 627ed0b1..202dd5f9 100755 --- a/music_assistant/managers/streams.py +++ b/music_assistant/managers/streams.py @@ -139,8 +139,7 @@ class StreamManager: fill_buffer_task = self.mass.loop.create_task(fill_buffer()) # start yielding audio chunks - chunk_size = sample_rate * 4 * 2 * 10 - async for chunk in sox_proc.iterate_chunks(chunk_size): + async for chunk in sox_proc.iterate_chunks(): yield chunk await asyncio.wait([fill_buffer_task])