From af61ff137e4280aacbce0c31cecca8bba7ad0332 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 7 Oct 2020 02:34:46 +0200 Subject: [PATCH] reinstate uvloop and chromecast fix --- music_assistant/__main__.py | 4 +- music_assistant/constants.py | 2 +- music_assistant/helpers/process.py | 292 ++++++++++++++++++ music_assistant/managers/streams.py | 222 ++++--------- .../providers/chromecast/player.py | 53 +++- .../providers/group_player/__init__.py | 50 +-- .../providers/squeezebox/socket_client.py | 2 +- music_assistant/web/endpoints/streams.py | 3 - 8 files changed, 409 insertions(+), 219 deletions(-) create mode 100644 music_assistant/helpers/process.py diff --git a/music_assistant/__main__.py b/music_assistant/__main__.py index ac7428ca..df4b264f 100755 --- a/music_assistant/__main__.py +++ b/music_assistant/__main__.py @@ -63,11 +63,9 @@ def main(): logger.info("shutdown requested!") loop.run_until_complete(mass.async_stop()) - # TODO: uvloop is temporary disabled due to a bug with subprocesses - # https://github.com/MagicStack/uvloop/issues/317 run( mass.async_start(), - use_uvloop=False, + use_uvloop=True, shutdown_callback=on_shutdown, executor_workers=64, ) diff --git a/music_assistant/constants.py b/music_assistant/constants.py index cb90e245..6d771a66 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -1,6 +1,6 @@ """All constants for Music Assistant.""" -__version__ = "0.0.51" +__version__ = "0.0.52" REQUIRED_PYTHON_VER = "3.7" # configuration keys/attributes diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py new file mode 100644 index 00000000..d5e857ba --- /dev/null +++ b/music_assistant/helpers/process.py @@ -0,0 +1,292 @@ +""" +Implementation of a (truly) non blocking subprocess. + +The subprocess implementation in asyncio can (still) sometimes cause deadlocks, +even when properly handling reading/writes from different tasks. +Besides that, when using multiple asyncio subprocesses, together with uvloop +things go very wrong: https://github.com/MagicStack/uvloop/issues/317 + +As we rely a lot on moving chunks around through subprocesses (mainly sox), +this custom implementation can be seen as a temporary solution until the main issue +in uvloop is resolved. +""" + +import asyncio +import logging +import subprocess +import threading +import time +from typing import AsyncGenerator, List, Optional + +LOGGER = logging.getLogger("AsyncProcess") + + +class AsyncProcess(object): + """Implementation of a (truly) non blocking subprocess.""" + + def __init__( + self, + process_args: List, + chunksize=512000, + enable_write: bool = False, + enable_shell=False, + ): + """Initialize.""" + self._process_args = process_args + self._chunksize = chunksize + self._enable_write = enable_write + self._enable_shell = enable_shell + self._exit = False + self._proc = None + self._id = int(time.time()) # some identifier for logging + + async def __aenter__(self) -> "AsyncProcess": + """Enter context manager, start running the process in executor.""" + LOGGER.debug("[%s] Entered context manager", self._id) + self._proc = subprocess.Popen( + self._process_args, + **{ + "shell": self._enable_shell, + "stdout": subprocess.PIPE, + "stdin": subprocess.PIPE if self._enable_write else None, + }, + ) + return self + + async def __aexit__(self, exc_type, exc_value, traceback) -> bool: + """Exit context manager.""" + if exc_type: + LOGGER.debug( + "[%s] Context manager exit with exception %s (%s)", + self._id, + exc_type, + str(exc_value), + ) + else: + LOGGER.debug("[%s] Context manager exit", self._id) + # signal that we must exit + self._exit = True + + def close_proc(): + if self._proc and self._proc.poll() is None: + # there is no real clean way to do this with all the blocking pipes + self._proc.kill() + + await asyncio.get_running_loop().run_in_executor(None, close_proc) + LOGGER.debug("[%s] Cleanup finished", self._id) + return True + + async def iterate_chunks(self) -> AsyncGenerator[bytes, None]: + """Yield chunks from the output Queue. Generator.""" + LOGGER.debug("[%s] start reading from generator", self._id) + while not self._exit: + chunk = await self.read() + yield chunk + if len(chunk) < self._chunksize: + break + LOGGER.debug("[%s] finished reading from generator", self._id) + + async def read(self) -> bytes: + """Read single chunk from the output Queue.""" + + def try_read(): + try: + data = self._proc.stdout.read(self._chunksize) + return data + except BrokenPipeError: + return b"" + except Exception as exc: # pylint: disable=broad-except + LOGGER.exception(exc) + return b"" + + return await asyncio.get_running_loop().run_in_executor(None, try_read) + + async def write(self, data: bytes) -> None: + """Write data to process.""" + + def try_write(_data): + try: + self._proc.stdin.write(_data) + except BrokenPipeError: + pass + except Exception as exc: # pylint: disable=broad-except + LOGGER.exception(exc) + + await asyncio.get_running_loop().run_in_executor(None, try_write, data) + + async def write_eof(self) -> None: + """Write eof to process.""" + + def try_write(): + try: + self._proc.stdin.close() + except BrokenPipeError: + pass + except Exception as exc: # pylint: disable=broad-except + LOGGER.exception(exc) + + await asyncio.get_running_loop().run_in_executor(None, try_write) + + async def communicate(self, input_data: Optional[bytes] = None) -> bytes: + """Write bytes to process and read back results.""" + if not self._enable_write and input_data: + raise RuntimeError("Write is disabled") + if input_data: + await self.write(input_data) + output = b"" + async for chunk in self.iterate_chunks(): + output += chunk + return output + + +# first attempt with queues, too complicated +# left here as reference +class AsyncProcessWithQueues(object): + """Implementation of a (truly) non blocking subprocess.""" + + def __init__( + self, + process_args: List, + chunksize=512000, + enable_write: bool = False, + enable_shell=False, + ): + """Initialize.""" + self._process_args = process_args + self._chunksize = chunksize + self._enable_write = enable_write + self._enable_shell = enable_shell + # we have large chunks, limit the queue size a bit. + import janus + + self.__queue_in = janus.Queue(8) + self.__queue_out = janus.Queue(4) + self.__proc_task = None + self._exit = threading.Event() + self._id = int(time.time()) # some identifier for logging + + async def __aenter__(self) -> "AsyncProcess": + """Enter context manager, start running the process in executor.""" + LOGGER.debug("[%s] Entered context manager", self._id) + self.__proc_task = asyncio.get_running_loop().run_in_executor(None, self._run) + return self + + async def __aexit__(self, exc_type, exc_value, traceback) -> bool: + """Exit context manager.""" + if exc_type: + LOGGER.debug( + "[%s] Context manager exit with exception %s (%s)", + self._id, + exc_type, + str(exc_value), + ) + else: + LOGGER.debug("[%s] Context manager exit", self._id) + # signal that we must exit + self._exit.set() + # if self._proc and self._proc.poll() is None: + # asyncio.get_running_loop().run_in_executor(None, self._proc.communicate) + print("1") + self.__queue_out.close() + self.__queue_in.close() + print("2") + await self.__queue_out.wait_closed() + await self.__queue_in.wait_closed() + print("3") + # await executor job + self.__proc_task.cancel() + # await self.__proc_task + print("4") + + LOGGER.debug("[%s] Cleanup finished", self._id) + return True + + async def iterate_chunks(self) -> AsyncGenerator[bytes, None]: + """Yield chunks from the output Queue. Generator.""" + LOGGER.debug("[%s] start reading from generator", self._id) + while not self._exit.is_set(): + chunk = await self.__queue_out.async_q.get() + self.__queue_out.async_q.task_done() + if not chunk: + break + yield chunk + LOGGER.debug("[%s] finished reading from generator", self._id) + + async def read(self) -> bytes: + """Read single chunk from the output Queue.""" + chunk = await self.__queue_out.async_q.get() + self.__queue_out.async_q.task_done() + return chunk + + async def write(self, data: Optional[bytes] = None) -> None: + """Write data to process.""" + if not self._exit.is_set(): + await self.__queue_in.async_q.put(data) + + async def write_eof(self) -> None: + """Write eof to process stdin.""" + await self.write(b"") + + async def communicate(self, input_data: Optional[bytes] = None) -> bytes: + """Write bytes to process and read back results.""" + if not self._enable_write and input_data: + raise RuntimeError("Write is disabled") + if input_data: + await self.write(input_data) + await self.write_eof() + output = b"" + async for chunk in self.iterate_chunks(): + output += chunk + return output + + def _run(self): + """Run actual process in executor thread.""" + LOGGER.info( + "[%s] Starting process with args: %s", self._id, str(self._process_args) + ) + proc = subprocess.Popen( + self._process_args, + **{ + "shell": self._enable_shell, + "stdout": subprocess.PIPE, + "stdin": subprocess.PIPE if self._enable_write else None, + }, + ) + + # start fill buffer task in (yet another) background thread + def fill_buffer(): + LOGGER.debug("[%s] start fill buffer", self._id) + try: + while not self._exit.is_set() and not self.__queue_in.closed: + chunk = self.__queue_in.sync_q.get() + if not chunk: + break + proc.stdin.write(chunk) + except Exception as exc: # pylint: disable=broad-except + LOGGER.debug("[%s], fill buffer aborted (%s)", self._id, str(exc)) + else: + LOGGER.debug("[%s] fill buffer finished", self._id) + + if self._enable_write: + fill_buffer_thread = threading.Thread( + target=fill_buffer, name=f"AsyncProcess_{self._id}" + ) + fill_buffer_thread.start() + + # consume bytes from stdout + try: + while not self._exit.is_set() and not self.__queue_out.closed: + chunk = proc.stdout.read(self._chunksize) + self.__queue_out.sync_q.put(chunk) + if len(chunk) < self._chunksize: + LOGGER.debug("[%s] last chunk received on stdout", self._id) + break + if self._enable_write: + fill_buffer_thread.join() + # write empty chunk to out queue to indicate end of stream just in case + self.__queue_out.sync_q.put(b"") + finally: + LOGGER.info("[%s] wait for process exit", self._id) + # pickup remaining bytes if process is stull running + if proc.poll() is None: + proc.communicate() diff --git a/music_assistant/managers/streams.py b/music_assistant/managers/streams.py index edc78689..9ccdaa05 100755 --- a/music_assistant/managers/streams.py +++ b/music_assistant/managers/streams.py @@ -3,6 +3,8 @@ StreamManager: handles all audio streaming to players. Either by sending tracks one by one or send one continuous stream of music with crossfade/gapless support (queue stream). + +All audio is processed by the SoX executable, using various subprocess streams. """ import asyncio import gc @@ -22,6 +24,7 @@ from music_assistant.helpers.encryption import ( async_decrypt_string, encrypt_bytes, ) +from music_assistant.helpers.process import AsyncProcess from music_assistant.helpers.typing import MusicAssistantType from music_assistant.helpers.util import ( async_yield_chunks, @@ -60,7 +63,7 @@ class StreamManager: output_format: SoxOutputFormat = SoxOutputFormat.FLAC, resample: Optional[int] = None, gain_db_adjust: Optional[float] = None, - chunk_size: int = 5000000, + chunk_size: int = 1024000, ) -> AsyncGenerator[Tuple[bool, bytes], None]: """Get the sox manipulated audio data for the given streamdetails.""" # collect all args for sox @@ -81,57 +84,37 @@ class StreamManager: args += ["vol", str(gain_db_adjust), "dB"] if resample: args += ["rate", "-v", str(resample)] - if not chunk_size: - chunk_size = int( - streamdetails.sample_rate * (streamdetails.bit_depth / 8) * 2 * 10 - ) + LOGGER.debug( "[async_get_sox_stream] [%s/%s] started using args: %s", streamdetails.provider, streamdetails.item_id, " ".join(args), ) - # init the process with stdin/out pipes - sox_proc = await asyncio.create_subprocess_exec( - *args, - stdout=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE, - limit=chunk_size * 5, - ) + async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc: - async def fill_buffer(): - """Forward audio chunks to sox stdin.""" - LOGGER.debug( - "[async_get_sox_stream] [%s/%s] fill_buffer started", - streamdetails.provider, - streamdetails.item_id, - ) - # feed audio data into sox stdin for processing - async for chunk in self.async_get_media_stream(streamdetails): - if self.mass.exit: - break - sox_proc.stdin.write(chunk) - await sox_proc.stdin.drain() - # send eof when last chunk received - sox_proc.stdin.write_eof() - await sox_proc.stdin.drain() - LOGGER.debug( - "[async_get_sox_stream] [%s/%s] fill_buffer finished", - streamdetails.provider, - streamdetails.item_id, - ) + async def fill_buffer(): + """Forward audio chunks to sox stdin.""" + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] fill_buffer started", + streamdetails.provider, + streamdetails.item_id, + ) + # feed audio data into sox stdin for processing + async for chunk in self.async_get_media_stream(streamdetails): + await sox_proc.write(chunk) + await sox_proc.write_eof() + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] fill_buffer finished", + streamdetails.provider, + streamdetails.item_id, + ) - fill_buffer_task = self.mass.loop.create_task(fill_buffer()) - try: + fill_buffer_task = self.mass.loop.create_task(fill_buffer()) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly prev_chunk = b"" - while True: - # read exactly chunksize of data - try: - chunk = await sox_proc.stdout.readexactly(chunk_size) - except asyncio.IncompleteReadError as exc: - chunk = exc.partial + async for chunk in sox_proc.iterate_chunks(): if len(chunk) < chunk_size: # last chunk yield (True, prev_chunk + chunk) @@ -142,19 +125,6 @@ class StreamManager: await asyncio.wait([fill_buffer_task]) - except (GeneratorExit, Exception): # pylint: disable=broad-except - LOGGER.warning( - "[async_get_sox_stream] [%s/%s] aborted", - streamdetails.provider, - streamdetails.item_id, - ) - if fill_buffer_task and not fill_buffer_task.cancelled(): - fill_buffer_task.cancel() - await sox_proc.communicate() - if sox_proc and sox_proc.returncode is None: - sox_proc.terminate() - await sox_proc.wait() - else: LOGGER.debug( "[async_get_sox_stream] [%s/%s] finished", streamdetails.provider, @@ -166,56 +136,35 @@ class StreamManager: chunk_size = 571392 # 74,7% of pcm args = ["sox", "-t", "s32", "-c", "2", "-r", "96000", "-", "-t", "flac", "-"] - sox_proc = await asyncio.create_subprocess_exec( - *args, - stdout=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE, - limit=chunk_size, - ) - LOGGER.debug( - "[async_queue_stream_flac] [%s] started using args: %s", - player_id, - " ".join(args), - ) + async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc: - # feed stdin with pcm samples - async def fill_buffer(): - """Feed audio data into sox stdin for processing.""" LOGGER.debug( - "[async_queue_stream_flac] [%s] fill buffer started", player_id - ) - async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32): - if self.mass.exit: - return - sox_proc.stdin.write(chunk) - await sox_proc.stdin.drain() - # write eof when no more data - sox_proc.stdin.write_eof() - await sox_proc.stdin.drain() - LOGGER.debug( - "[async_queue_stream_flac] [%s] fill buffer finished", player_id + "[async_queue_stream_flac] [%s] started using args: %s", + player_id, + " ".join(args), ) - fill_buffer_task = self.mass.loop.create_task(fill_buffer()) - try: - # yield flac chunks from stdout - while True: - try: - chunk = await sox_proc.stdout.readexactly(chunk_size) - yield chunk - except asyncio.IncompleteReadError as exc: - chunk = exc.partial - yield chunk - break - except (GeneratorExit, Exception): # pylint: disable=broad-except - LOGGER.debug("[async_queue_stream_flac] [%s] aborted", player_id) - if fill_buffer_task and not fill_buffer_task.cancelled(): - fill_buffer_task.cancel() - await sox_proc.communicate() - if sox_proc and sox_proc.returncode is None: - sox_proc.terminate() - await sox_proc.wait() - else: + # feed stdin with pcm samples + async def fill_buffer(): + """Feed audio data into sox stdin for processing.""" + LOGGER.debug( + "[async_queue_stream_flac] [%s] fill buffer started", player_id + ) + async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32): + if self.mass.exit: + return + await sox_proc.write(chunk) + # write eof when no more data + await sox_proc.write_eof() + LOGGER.debug( + "[async_queue_stream_flac] [%s] fill buffer finished", player_id + ) + + fill_buffer_task = self.mass.loop.create_task(fill_buffer()) + # start yielding audio chunks + async for chunk in sox_proc.iterate_chunks(): + yield chunk + await asyncio.wait([fill_buffer_task]) LOGGER.debug( "[async_queue_stream_flac] [%s] finished", player_id, @@ -418,6 +367,7 @@ class StreamManager: stream_path = await async_decrypt_string(streamdetails.path) stream_type = StreamType(streamdetails.type) audio_data = b"" + chunk_size = 512000 # Handle (optional) caching of audio data cache_id = f"{streamdetails.item_id}{streamdetails.provider}"[::-1] @@ -445,53 +395,30 @@ class StreamManager: ) if stream_type == StreamType.CACHE: - async for chunk in async_yield_chunks(audio_data, 512000): + async for chunk in async_yield_chunks(audio_data, chunk_size): yield chunk elif stream_type == StreamType.URL: async with self.mass.http_session.get(stream_path) as response: - async for chunk in response.content.iter_any(): + while True: + chunk = await response.content.read(chunk_size) + if not chunk: + break yield chunk if len(audio_data) < 100000000: audio_data += chunk elif stream_type == StreamType.FILE: async with AIOFile(stream_path) as afp: - async for chunk in Reader(afp): + async for chunk in Reader(afp, chunk_size=chunk_size): yield chunk if len(audio_data) < 100000000: audio_data += chunk elif stream_type == StreamType.EXECUTABLE: args = shlex.split(stream_path) - chunk_size = 512000 - process = await asyncio.create_subprocess_exec( - *args, stdout=asyncio.subprocess.PIPE, limit=chunk_size - ) - try: - while True: - # read exactly chunksize of data - try: - chunk = await process.stdout.readexactly(chunk_size) - except asyncio.IncompleteReadError as exc: - chunk = exc.partial + async with AsyncProcess(args, chunk_size, False) as process: + async for chunk in process.iterate_chunks(): yield chunk if len(audio_data) < 100000000: audio_data += chunk - if len(chunk) < chunk_size: - # last chunk - break - except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except - LOGGER.warning( - "[async_get_media_stream] [%s/%s] Aborted: %s", - streamdetails.provider, - streamdetails.item_id, - str(exc), - ) - # read remaining bytes - process.terminate() - await process.communicate() - if process and process.returncode is None: - process.terminate() - await process.wait() - raise GeneratorExit from exc # signal end of stream event self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails) @@ -578,32 +505,20 @@ async def async_crossfade_pcm_parts( fadeinfile = create_tempfile() args = ["sox", "--ignore-length", "-t"] + pcm_args args += ["-", "-t"] + pcm_args + [fadeinfile.name, "fade", "t", str(fade_length)] - process = await asyncio.create_subprocess_exec( - *args, stdin=asyncio.subprocess.PIPE, limit=10000000 - ) - await process.communicate(fade_in_part) + async with AsyncProcess(args, enable_write=True) as sox_proc: + await sox_proc.communicate(fade_in_part) # create fade-out part fadeoutfile = create_tempfile() args = ["sox", "--ignore-length", "-t"] + pcm_args + ["-", "-t"] + pcm_args args += [fadeoutfile.name, "reverse", "fade", "t", str(fade_length), "reverse"] - process = await asyncio.create_subprocess_exec( - *args, - stdout=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE, - limit=10000000, - ) - await process.communicate(fade_out_part) + async with AsyncProcess(args, enable_write=True) as sox_proc: + await sox_proc.communicate(fade_out_part) # create crossfade using sox and some temp files # TODO: figure out how to make this less complex and without the tempfiles args = ["sox", "-m", "-v", "1.0", "-t"] + pcm_args + [fadeoutfile.name, "-v", "1.0"] args += ["-t"] + pcm_args + [fadeinfile.name, "-t"] + pcm_args + ["-"] - process = await asyncio.create_subprocess_exec( - *args, - stdout=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE, - limit=10000000, - ) - crossfade_part, _ = await process.communicate() + async with AsyncProcess(args, enable_write=False) as sox_proc: + crossfade_part = await sox_proc.communicate() fadeinfile.close() fadeoutfile.close() del fadeinfile @@ -621,11 +536,6 @@ async def async_strip_silence( args += ["silence", "1", "0.1", "1%"] if reverse: args.append("reverse") - process = await asyncio.create_subprocess_exec( - *args, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - limit=10000000, - ) - stripped_data, _ = await process.communicate(audio_data) + async with AsyncProcess(args, enable_write=True) as sox_proc: + stripped_data = await sox_proc.communicate(audio_data) return stripped_data diff --git a/music_assistant/providers/chromecast/player.py b/music_assistant/providers/chromecast/player.py index c5596df1..968de855 100644 --- a/music_assistant/providers/chromecast/player.py +++ b/music_assistant/providers/chromecast/player.py @@ -192,7 +192,9 @@ class ChromecastPlayer(Player): def set_cast_info(self, cast_info: ChromecastInfo) -> None: """Set the cast information and set up the chromecast object.""" self._cast_info = cast_info - if self._chromecast is not None: + if self._chromecast and not self._chromecast.socket_client.is_connected: + self.disconnect() + elif self._chromecast is not None: return LOGGER.debug( "[%s] Connecting to cast device by service %s", @@ -293,7 +295,9 @@ class ChromecastPlayer(Player): self._available = new_available self.update_state() if self._cast_info.is_audio_group and new_available: - self.mass.add_job(self._chromecast.mz_controller.update_members) + self.try_chromecast_command( + self._chromecast.mz_controller.update_members + ) async def async_on_update(self) -> None: """Call when player is periodically polled by the player manager (should_poll=True).""" @@ -301,7 +305,7 @@ class ChromecastPlayer(Player): "group_player" ): # the group player wants very accurate elapsed_time state so we request it very often - self.mass.add_job(self._chromecast.media_controller.update_status) + self.try_chromecast_command(self._chromecast.media_controller.update_status) self.update_state() # ========== Service Calls ========== @@ -309,31 +313,31 @@ class ChromecastPlayer(Player): async def async_cmd_stop(self) -> None: """Send stop command to player.""" if self._chromecast and self._chromecast.media_controller: - self.mass.add_job(self._chromecast.media_controller.stop) + self.try_chromecast_command(self._chromecast.media_controller.stop) async def async_cmd_play(self) -> None: """Send play command to player.""" if self._chromecast.media_controller: - self.mass.add_job(self._chromecast.media_controller.play) + self.try_chromecast_command(self._chromecast.media_controller.play) async def async_cmd_pause(self) -> None: """Send pause command to player.""" if self._chromecast.media_controller: - self.mass.add_job(self._chromecast.media_controller.pause) + self.try_chromecast_command(self._chromecast.media_controller.pause) async def async_cmd_next(self) -> None: """Send next track command to player.""" if self._chromecast.media_controller: - self.mass.add_job(self._chromecast.media_controller.queue_next) + self.try_chromecast_command(self._chromecast.media_controller.queue_next) async def async_cmd_previous(self) -> None: """Send previous track command to player.""" if self._chromecast.media_controller: - self.mass.add_job(self._chromecast.media_controller.queue_prev) + self.try_chromecast_command(self._chromecast.media_controller.queue_prev) async def async_cmd_power_on(self) -> None: """Send power ON command to player.""" - self.mass.add_job(self._chromecast.set_volume_muted, False) + self.try_chromecast_command(self._chromecast.set_volume_muted, False) async def async_cmd_power_off(self) -> None: """Send power OFF command to player.""" @@ -342,17 +346,17 @@ class ChromecastPlayer(Player): or self.media_status.player_is_paused or self.media_status.player_is_idle ): - self.mass.add_job(self._chromecast.media_controller.stop) + self.try_chromecast_command(self._chromecast.media_controller.stop) # chromecast has no real poweroff so we send mute instead - self.mass.add_job(self._chromecast.set_volume_muted, True) + self.try_chromecast_command(self._chromecast.set_volume_muted, True) async def async_cmd_volume_set(self, volume_level: int) -> None: """Send new volume level command to player.""" - self.mass.add_job(self._chromecast.set_volume, volume_level / 100) + self.try_chromecast_command(self._chromecast.set_volume, volume_level / 100) async def async_cmd_volume_mute(self, is_muted: bool = False) -> None: """Send mute command to player.""" - self.mass.add_job(self._chromecast.set_volume_muted, is_muted) + self.try_chromecast_command(self._chromecast.set_volume_muted, is_muted) async def async_cmd_play_uri(self, uri: str) -> None: """Play single uri on player.""" @@ -363,7 +367,7 @@ class ChromecastPlayer(Player): queue_item.name = "Music Assistant" queue_item.uri = uri return await self.async_cmd_queue_load([queue_item, queue_item]) - self.mass.add_job(self._chromecast.play_media, uri, "audio/flac") + self.try_chromecast_command(self._chromecast.play_media, uri, "audio/flac") async def async_cmd_queue_load(self, queue_items: List[QueueItem]) -> None: """Load (overwrite) queue with new items.""" @@ -378,7 +382,7 @@ class ChromecastPlayer(Player): "startIndex": 0, # Item index to play after this request or keep same item if undefined "items": cc_queue_items, # only load 50 tracks at once or the socket will crash } - self.mass.add_job(self.__send_player_queue, queuedata) + self.try_chromecast_command(self.__send_player_queue, queuedata) if len(queue_items) > 50: await self.async_cmd_queue_append(queue_items[51:]) @@ -391,7 +395,7 @@ class ChromecastPlayer(Player): "insertBefore": None, "items": chunk, } - self.mass.add_job(self.__send_player_queue, queuedata) + self.try_chromecast_command(self.__send_player_queue, queuedata) def __create_queue_items(self, tracks) -> None: """Create list of CC queue items from tracks.""" @@ -446,3 +450,20 @@ class ChromecastPlayer(Player): ) else: send_queue() + + def try_chromecast_command(self, func, *args, **kwargs): + """Try to execute Chromecast command.""" + + def handle_command(func, *args, **kwarg): + if not self._chromecast.socket_client.is_connected: + return + try: + return func(*args, **kwargs) + except Exception as exc: # pylint: disable=broad-except + LOGGER.error( + "Error while executing command on player %s: %s", + self.name, + str(exc), + ) + + self.mass.add_job(handle_command, func, *args, **kwargs) diff --git a/music_assistant/providers/group_player/__init__.py b/music_assistant/providers/group_player/__init__.py index 1f74e229..a0dec967 100644 --- a/music_assistant/providers/group_player/__init__.py +++ b/music_assistant/providers/group_player/__init__.py @@ -318,32 +318,11 @@ class GroupPlayer(Player): async def subscribe_stream_client(self, child_player_id): """Handle streaming to all players of a group. Highly experimental.""" - # each connected client gets its own sox process to convert incoming pcm samples - # to flac (which is streamed to the player). - args = [ - "sox", - "-t", - "s32", - "-c", - "2", - "-r", - "96000", - "-", - "-t", - "flac", - "-C", - "0", - "-", - ] - sox_proc = await asyncio.create_subprocess_exec( - *args, - stdout=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE, - ) - chunk_size = 2880000 # roughly 5 seconds of flac @ 96000/32 + # each connected client gets its own Queue to which audio chunks (flac) are sent try: # report this client as connected - self.connected_clients[child_player_id] = sox_proc.stdin + queue = asyncio.Queue() + self.connected_clients[child_player_id] = queue LOGGER.debug( "[%s] child player connected: %s", self.player_id, @@ -351,22 +330,16 @@ class GroupPlayer(Player): ) # yield flac chunks from stdout to the http streamresponse while True: - try: - chunk = await sox_proc.stdout.readexactly(chunk_size) - yield chunk - except asyncio.IncompleteReadError as exc: - chunk = exc.partial - yield chunk + chunk = await queue.get() + yield chunk + queue.task_done() + if not chunk: break except (GeneratorExit, Exception): # pylint: disable=broad-except LOGGER.warning( "[%s] child player aborted stream: %s", self.player_id, child_player_id ) self.connected_clients.pop(child_player_id, None) - await sox_proc.communicate() - if sox_proc and sox_proc.returncode is None: - sox_proc.terminate() - await sox_proc.wait() else: self.connected_clients.pop(child_player_id, None) LOGGER.debug( @@ -390,8 +363,8 @@ class GroupPlayer(Player): ) self.sync_task = asyncio.create_task(self.__synchronize_players()) - async for audio_chunk in self.mass.streams.async_queue_stream_pcm( - self.player_id, sample_rate=96000, bit_depth=32 + async for audio_chunk in self.mass.streams.async_queue_stream_flac( + self.player_id ): # make sure we still have clients connected @@ -401,9 +374,8 @@ class GroupPlayer(Player): # send the audio chunk to all connected players tasks = [] - for _writer in self.connected_clients.values(): - tasks.append(self.mass.add_job(_writer.write, audio_chunk)) - tasks.append(self.mass.add_job(_writer.drain())) + for _queue in self.connected_clients.values(): + tasks.append(self.mass.add_job(_queue.put(audio_chunk))) # wait for clients to consume the data await asyncio.wait(tasks) diff --git a/music_assistant/providers/squeezebox/socket_client.py b/music_assistant/providers/squeezebox/socket_client.py index 380b14c9..88c271c8 100644 --- a/music_assistant/providers/squeezebox/socket_client.py +++ b/music_assistant/providers/squeezebox/socket_client.py @@ -451,7 +451,7 @@ class SqueezeSocketClient: """Process incoming stat STMu message: Buffer underrun: Normal end of playback.""" # pylint: disable=unused-argument LOGGER.debug("STMu received - end of playback.") - self.state = STATE_STOPPED + self._state = STATE_STOPPED self.signal_event(SqueezeEvent.STATE_UPDATED) def _process_resp(self, data): diff --git a/music_assistant/web/endpoints/streams.py b/music_assistant/web/endpoints/streams.py index 54c0b94d..d62307ed 100644 --- a/music_assistant/web/endpoints/streams.py +++ b/music_assistant/web/endpoints/streams.py @@ -25,7 +25,6 @@ async def stream_media(request: Request): resp = StreamResponse( status=200, reason="OK", headers={"Content-Type": f"audio/{content_type}"} ) - resp.enable_chunked_encoding() await resp.prepare(request) # stream track @@ -70,7 +69,6 @@ async def stream_queue_item(request: Request): resp = StreamResponse( status=200, reason="OK", headers={"Content-Type": "audio/flac"} ) - resp.enable_chunked_encoding() await resp.prepare(request) async for audio_chunk in request.app["mass"].streams.async_stream_queue_item( @@ -93,7 +91,6 @@ async def stream_group(request: Request): resp = StreamResponse( status=200, reason="OK", headers={"Content-Type": "audio/flac"} ) - resp.enable_chunked_encoding() await resp.prepare(request) # stream queue -- 2.34.1