From a034877f0e71f65427d0dc69e134a591f96622c4 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 7 Oct 2020 12:25:39 +0200 Subject: [PATCH] final fix for the asyncprocess --- music_assistant/helpers/process.py | 290 ++++++------------ music_assistant/managers/config.py | 5 +- music_assistant/managers/music.py | 2 +- music_assistant/managers/streams.py | 77 +++-- music_assistant/mass.py | 7 +- music_assistant/providers/spotify/__init__.py | 2 + music_assistant/web/endpoints/players.py | 6 + 7 files changed, 163 insertions(+), 226 deletions(-) diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index d5e857ba..3fddfc2b 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -36,21 +36,17 @@ class AsyncProcess(object): self._chunksize = chunksize self._enable_write = enable_write self._enable_shell = enable_shell + self.loop = asyncio.get_running_loop() + self.__queue_in = asyncio.Queue(4) + self.__queue_out = asyncio.Queue(8) + self.__proc_task = None self._exit = False - self._proc = None self._id = int(time.time()) # some identifier for logging async def __aenter__(self) -> "AsyncProcess": """Enter context manager, start running the process in executor.""" LOGGER.debug("[%s] Entered context manager", self._id) - self._proc = subprocess.Popen( - self._process_args, - **{ - "shell": self._enable_shell, - "stdout": subprocess.PIPE, - "stdin": subprocess.PIPE if self._enable_write else None, - }, - ) + self.__proc_task = self.loop.run_in_executor(None, self.__run_proc) return self async def __aexit__(self, exc_type, exc_value, traceback) -> bool: @@ -64,168 +60,47 @@ class AsyncProcess(object): ) else: LOGGER.debug("[%s] Context manager exit", self._id) - # signal that we must exit - self._exit = True - - def close_proc(): - if self._proc and self._proc.poll() is None: - # there is no real clean way to do this with all the blocking pipes - self._proc.kill() - await asyncio.get_running_loop().run_in_executor(None, close_proc) + self._exit = True + # prevent a deadlock by clearing the queues + while self.__queue_in.qsize(): + await self.__queue_in.get() + self.__queue_in.task_done() + self.__queue_in.put_nowait(b"") + while self.__queue_out.qsize(): + await self.__queue_out.get() + self.__queue_out.task_done() + await self.__proc_task LOGGER.debug("[%s] Cleanup finished", self._id) return True async def iterate_chunks(self) -> AsyncGenerator[bytes, None]: """Yield chunks from the output Queue. Generator.""" LOGGER.debug("[%s] start reading from generator", self._id) - while not self._exit: + while True: chunk = await self.read() yield chunk - if len(chunk) < self._chunksize: + if not chunk or len(chunk) < self._chunksize: break LOGGER.debug("[%s] finished reading from generator", self._id) async def read(self) -> bytes: """Read single chunk from the output Queue.""" - - def try_read(): - try: - data = self._proc.stdout.read(self._chunksize) - return data - except BrokenPipeError: - return b"" - except Exception as exc: # pylint: disable=broad-except - LOGGER.exception(exc) - return b"" - - return await asyncio.get_running_loop().run_in_executor(None, try_read) + if self._exit: + raise RuntimeError("Already exited") + data = await self.__queue_out.get() + self.__queue_out.task_done() + return data async def write(self, data: bytes) -> None: """Write data to process.""" - - def try_write(_data): - try: - self._proc.stdin.write(_data) - except BrokenPipeError: - pass - except Exception as exc: # pylint: disable=broad-except - LOGGER.exception(exc) - - await asyncio.get_running_loop().run_in_executor(None, try_write, data) + if self._exit: + raise RuntimeError("Already exited") + await self.__queue_in.put(data) async def write_eof(self) -> None: """Write eof to process.""" - - def try_write(): - try: - self._proc.stdin.close() - except BrokenPipeError: - pass - except Exception as exc: # pylint: disable=broad-except - LOGGER.exception(exc) - - await asyncio.get_running_loop().run_in_executor(None, try_write) - - async def communicate(self, input_data: Optional[bytes] = None) -> bytes: - """Write bytes to process and read back results.""" - if not self._enable_write and input_data: - raise RuntimeError("Write is disabled") - if input_data: - await self.write(input_data) - output = b"" - async for chunk in self.iterate_chunks(): - output += chunk - return output - - -# first attempt with queues, too complicated -# left here as reference -class AsyncProcessWithQueues(object): - """Implementation of a (truly) non blocking subprocess.""" - - def __init__( - self, - process_args: List, - chunksize=512000, - enable_write: bool = False, - enable_shell=False, - ): - """Initialize.""" - self._process_args = process_args - self._chunksize = chunksize - self._enable_write = enable_write - self._enable_shell = enable_shell - # we have large chunks, limit the queue size a bit. - import janus - - self.__queue_in = janus.Queue(8) - self.__queue_out = janus.Queue(4) - self.__proc_task = None - self._exit = threading.Event() - self._id = int(time.time()) # some identifier for logging - - async def __aenter__(self) -> "AsyncProcess": - """Enter context manager, start running the process in executor.""" - LOGGER.debug("[%s] Entered context manager", self._id) - self.__proc_task = asyncio.get_running_loop().run_in_executor(None, self._run) - return self - - async def __aexit__(self, exc_type, exc_value, traceback) -> bool: - """Exit context manager.""" - if exc_type: - LOGGER.debug( - "[%s] Context manager exit with exception %s (%s)", - self._id, - exc_type, - str(exc_value), - ) - else: - LOGGER.debug("[%s] Context manager exit", self._id) - # signal that we must exit - self._exit.set() - # if self._proc and self._proc.poll() is None: - # asyncio.get_running_loop().run_in_executor(None, self._proc.communicate) - print("1") - self.__queue_out.close() - self.__queue_in.close() - print("2") - await self.__queue_out.wait_closed() - await self.__queue_in.wait_closed() - print("3") - # await executor job - self.__proc_task.cancel() - # await self.__proc_task - print("4") - - LOGGER.debug("[%s] Cleanup finished", self._id) - return True - - async def iterate_chunks(self) -> AsyncGenerator[bytes, None]: - """Yield chunks from the output Queue. Generator.""" - LOGGER.debug("[%s] start reading from generator", self._id) - while not self._exit.is_set(): - chunk = await self.__queue_out.async_q.get() - self.__queue_out.async_q.task_done() - if not chunk: - break - yield chunk - LOGGER.debug("[%s] finished reading from generator", self._id) - - async def read(self) -> bytes: - """Read single chunk from the output Queue.""" - chunk = await self.__queue_out.async_q.get() - self.__queue_out.async_q.task_done() - return chunk - - async def write(self, data: Optional[bytes] = None) -> None: - """Write data to process.""" - if not self._exit.is_set(): - await self.__queue_in.async_q.put(data) - - async def write_eof(self) -> None: - """Write eof to process stdin.""" - await self.write(b"") + await self.__queue_in.put(b"") async def communicate(self, input_data: Optional[bytes] = None) -> bytes: """Write bytes to process and read back results.""" @@ -239,54 +114,75 @@ class AsyncProcessWithQueues(object): output += chunk return output - def _run(self): - """Run actual process in executor thread.""" - LOGGER.info( - "[%s] Starting process with args: %s", self._id, str(self._process_args) - ) - proc = subprocess.Popen( - self._process_args, - **{ - "shell": self._enable_shell, - "stdout": subprocess.PIPE, - "stdin": subprocess.PIPE if self._enable_write else None, - }, - ) - - # start fill buffer task in (yet another) background thread - def fill_buffer(): - LOGGER.debug("[%s] start fill buffer", self._id) - try: - while not self._exit.is_set() and not self.__queue_in.closed: - chunk = self.__queue_in.sync_q.get() - if not chunk: - break - proc.stdin.write(chunk) - except Exception as exc: # pylint: disable=broad-except - LOGGER.debug("[%s], fill buffer aborted (%s)", self._id, str(exc)) - else: - LOGGER.debug("[%s] fill buffer finished", self._id) - - if self._enable_write: - fill_buffer_thread = threading.Thread( - target=fill_buffer, name=f"AsyncProcess_{self._id}" - ) - fill_buffer_thread.start() - - # consume bytes from stdout + def __run_proc(self): + """Run process in executor.""" try: - while not self._exit.is_set() and not self.__queue_out.closed: - chunk = proc.stdout.read(self._chunksize) - self.__queue_out.sync_q.put(chunk) - if len(chunk) < self._chunksize: - LOGGER.debug("[%s] last chunk received on stdout", self._id) - break + LOGGER.info( + "[%s] Starting process with args: %s", self._id, str(self._process_args) + ) + proc = subprocess.Popen( + self._process_args, + shell=self._enable_shell, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE if self._enable_write else None, + ) if self._enable_write: - fill_buffer_thread.join() - # write empty chunk to out queue to indicate end of stream just in case - self.__queue_out.sync_q.put(b"") + threading.Thread( + target=self.__write_stdin, + args=(proc.stdin,), + name=f"AsyncProcess_{self._id}_write_stdin", + daemon=True, + ).start() + threading.Thread( + target=self.__read_stdout, + args=(proc.stdout,), + name=f"AsyncProcess_{self._id}_read_stdout", + daemon=True, + ).start() + proc.wait() + + except Exception as exc: # pylint: disable=broad-except + LOGGER.exception(exc) finally: - LOGGER.info("[%s] wait for process exit", self._id) - # pickup remaining bytes if process is stull running + LOGGER.error("[%s] process exiting", self._id) if proc.poll() is None: + proc.terminate() proc.communicate() + LOGGER.debug("[%s] process finished", self._id) + + def __write_stdin(self, _stdin): + """Put chunks from queue to stdin.""" + LOGGER.debug("[%s] start write_stdin", self._id) + try: + while True: + chunk = asyncio.run_coroutine_threadsafe( + self.__queue_in.get(), self.loop + ).result() + self.__queue_in.task_done() + if not chunk: + _stdin.close() + break + _stdin.write(chunk) + except Exception as exc: # pylint: disable=broad-except + LOGGER.debug("[%s] write_stdin aborted (%s)", self._id, str(exc)) + else: + LOGGER.debug("[%s] write_stdin finished", self._id) + + def __read_stdout(self, _stdout): + """Put chunks from stdout to queue.""" + LOGGER.debug("[%s] start read_stdout", self._id) + try: + while True: + chunk = _stdout.read(self._chunksize) + asyncio.run_coroutine_threadsafe( + self.__queue_out.put(chunk), self.loop + ).result() + if not chunk or len(chunk) < self._chunksize: + LOGGER.debug("[%s] last chunk received on stdout", self._id) + break + # write empty chunk just in case + asyncio.run_coroutine_threadsafe(self.__queue_out.put(b""), self.loop) + except Exception as exc: # pylint: disable=broad-except + LOGGER.debug("[%s] read_stdout aborted (%s)", self._id, str(exc)) + else: + LOGGER.debug("[%s] read_stdout finished", self._id) diff --git a/music_assistant/managers/config.py b/music_assistant/managers/config.py index 1e8285ed..6657991b 100755 --- a/music_assistant/managers/config.py +++ b/music_assistant/managers/config.py @@ -457,7 +457,10 @@ class ProviderSettings(ConfigBaseItem): def all_keys(self): """Return all possible keys of this Config object.""" prov_type = PROVIDER_TYPE_MAPPINGS[self.conf_key] - return (item.id for item in self.mass.get_providers(prov_type)) + return ( + item.id + for item in self.mass.get_providers(prov_type, include_unavailable=True) + ) def get_config_entries(self, child_key: str) -> List[ConfigEntry]: """Return all config entries for the given provider.""" diff --git a/music_assistant/managers/music.py b/music_assistant/managers/music.py index 49408d83..0f86b36b 100755 --- a/music_assistant/managers/music.py +++ b/music_assistant/managers/music.py @@ -1115,7 +1115,7 @@ class MusicManager: ): # get streamdetails from provider music_prov = self.mass.get_provider(prov_media.provider) - if not music_prov: + if not music_prov or not music_prov.available: continue # provider temporary unavailable ? streamdetails = await music_prov.async_get_stream_details( diff --git a/music_assistant/managers/streams.py b/music_assistant/managers/streams.py index 9ccdaa05..b9599bcb 100755 --- a/music_assistant/managers/streams.py +++ b/music_assistant/managers/streams.py @@ -93,6 +93,8 @@ class StreamManager: ) async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc: + cancelled = False + async def fill_buffer(): """Forward audio chunks to sox stdin.""" LOGGER.debug( @@ -102,6 +104,8 @@ class StreamManager: ) # feed audio data into sox stdin for processing async for chunk in self.async_get_media_stream(streamdetails): + if self.mass.exit or cancelled: + break await sox_proc.write(chunk) await sox_proc.write_eof() LOGGER.debug( @@ -113,23 +117,34 @@ class StreamManager: fill_buffer_task = self.mass.loop.create_task(fill_buffer()) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly - prev_chunk = b"" - async for chunk in sox_proc.iterate_chunks(): - if len(chunk) < chunk_size: - # last chunk - yield (True, prev_chunk + chunk) - break - if prev_chunk: - yield (False, prev_chunk) - prev_chunk = chunk + try: + prev_chunk = b"" + async for chunk in sox_proc.iterate_chunks(): + if len(chunk) < chunk_size: + # last chunk + yield (True, prev_chunk + chunk) + break + if prev_chunk: + yield (False, prev_chunk) + prev_chunk = chunk - await asyncio.wait([fill_buffer_task]) + await asyncio.wait([fill_buffer_task]) - LOGGER.debug( - "[async_get_sox_stream] [%s/%s] finished", - streamdetails.provider, - streamdetails.item_id, - ) + except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except + cancelled = True + fill_buffer_task.cancel() + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] cancelled", + streamdetails.provider, + streamdetails.item_id, + ) + raise exc + else: + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] finished", + streamdetails.provider, + streamdetails.item_id, + ) async def async_queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]: """Stream the PlayerQueue's tracks as constant feed in flac format.""" @@ -145,14 +160,16 @@ class StreamManager: ) # feed stdin with pcm samples + cancelled = False + async def fill_buffer(): """Feed audio data into sox stdin for processing.""" LOGGER.debug( "[async_queue_stream_flac] [%s] fill buffer started", player_id ) async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32): - if self.mass.exit: - return + if self.mass.exit or cancelled: + break await sox_proc.write(chunk) # write eof when no more data await sox_proc.write_eof() @@ -161,14 +178,24 @@ class StreamManager: ) fill_buffer_task = self.mass.loop.create_task(fill_buffer()) - # start yielding audio chunks - async for chunk in sox_proc.iterate_chunks(): - yield chunk - await asyncio.wait([fill_buffer_task]) - LOGGER.debug( - "[async_queue_stream_flac] [%s] finished", - player_id, - ) + try: + # start yielding audio chunks + async for chunk in sox_proc.iterate_chunks(): + yield chunk + await asyncio.wait([fill_buffer_task]) + except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except + cancelled = True + fill_buffer_task.cancel() + LOGGER.debug( + "[async_queue_stream_flac] [%s] cancelled", + player_id, + ) + raise exc + else: + LOGGER.debug( + "[async_queue_stream_flac] [%s] finished", + player_id, + ) async def async_queue_stream_pcm( self, player_id, sample_rate=96000, bit_depth=32 diff --git a/music_assistant/mass.py b/music_assistant/mass.py index a087d887..a48d0e08 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -196,13 +196,16 @@ class MusicAssistant: @callback def get_providers( - self, filter_type: Optional[ProviderType] = None + self, + filter_type: Optional[ProviderType] = None, + include_unavailable: bool = False, ) -> List[Provider]: """Return all providers, optionally filtered by type.""" return [ item for item in self._providers.values() - if (filter_type is None or item.type == filter_type) and item.available + if (filter_type is None or item.type == filter_type) + and (include_unavailable or item.available) ] @callback diff --git a/music_assistant/providers/spotify/__init__.py b/music_assistant/providers/spotify/__init__.py index 2eb02288..39d22574 100644 --- a/music_assistant/providers/spotify/__init__.py +++ b/music_assistant/providers/spotify/__init__.py @@ -61,6 +61,8 @@ class SpotifyProvider(MusicProvider): __auth_token = None sp_user = None + _username = None + _password = None @property def id(self) -> str: diff --git a/music_assistant/web/endpoints/players.py b/music_assistant/web/endpoints/players.py index ae5c3b10..bac9e3b3 100644 --- a/music_assistant/web/endpoints/players.py +++ b/music_assistant/web/endpoints/players.py @@ -70,6 +70,8 @@ async def async_player_queue_item(request: Request): player_id = request.match_info.get("player_id") item_id = request.match_info.get("queue_item") player_queue = request.app["mass"].players.get_player_queue(player_id) + if not player_queue: + return Response(text="invalid player", status=404) try: item_id = int(item_id) queue_item = player_queue.get_item(item_id) @@ -84,6 +86,8 @@ async def async_player_queue_items(request: Request): """Return the items in the player's queue.""" player_id = request.match_info.get("player_id") player_queue = request.app["mass"].players.get_player_queue(player_id) + if not player_queue: + return Response(text="invalid player", status=404) async def async_queue_tracks_iter(): for item in player_queue.items: @@ -98,6 +102,8 @@ async def async_player_queue(request: Request): """Return the player queue details.""" player_id = request.match_info.get("player_id") player_queue = request.app["mass"].players.get_player_queue(player_id) + if not player_queue: + return Response(text="invalid player", status=404) return Response( body=json_serializer(player_queue.to_dict()), content_type="application/json" ) -- 2.34.1