reinstate uvloop and chromecast fix
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 7 Oct 2020 00:34:46 +0000 (02:34 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 7 Oct 2020 00:34:46 +0000 (02:34 +0200)
music_assistant/__main__.py
music_assistant/constants.py
music_assistant/helpers/process.py [new file with mode: 0644]
music_assistant/managers/streams.py
music_assistant/providers/chromecast/player.py
music_assistant/providers/group_player/__init__.py
music_assistant/providers/squeezebox/socket_client.py
music_assistant/web/endpoints/streams.py

index ac7428ca422d477e2fbc3d713fa09626ec31a0fc..df4b264ffe75fca131aeceecb6e7695502e40651 100755 (executable)
@@ -63,11 +63,9 @@ def main():
         logger.info("shutdown requested!")
         loop.run_until_complete(mass.async_stop())
 
-    # TODO: uvloop is temporary disabled due to a bug with subprocesses
-    # https://github.com/MagicStack/uvloop/issues/317
     run(
         mass.async_start(),
-        use_uvloop=False,
+        use_uvloop=True,
         shutdown_callback=on_shutdown,
         executor_workers=64,
     )
index cb90e24531895710ba70b523e6dd05b36ee8c6d4..6d771a661ff765b58509dc759eafbb767d6ffbde 100755 (executable)
@@ -1,6 +1,6 @@
 """All constants for Music Assistant."""
 
-__version__ = "0.0.51"
+__version__ = "0.0.52"
 REQUIRED_PYTHON_VER = "3.7"
 
 # configuration keys/attributes
diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py
new file mode 100644 (file)
index 0000000..d5e857b
--- /dev/null
@@ -0,0 +1,292 @@
+"""
+Implementation of a (truly) non blocking subprocess.
+
+The subprocess implementation in asyncio can (still) sometimes cause deadlocks,
+even when properly handling reading/writes from different tasks.
+Besides that, when using multiple asyncio subprocesses, together with uvloop
+things go very wrong: https://github.com/MagicStack/uvloop/issues/317
+
+As we rely a lot on moving chunks around through subprocesses (mainly sox),
+this custom implementation can be seen as a temporary solution until the main issue
+in uvloop is resolved.
+"""
+
+import asyncio
+import logging
+import subprocess
+import threading
+import time
+from typing import AsyncGenerator, List, Optional
+
+LOGGER = logging.getLogger("AsyncProcess")
+
+
+class AsyncProcess(object):
+    """Implementation of a (truly) non blocking subprocess."""
+
+    def __init__(
+        self,
+        process_args: List,
+        chunksize=512000,
+        enable_write: bool = False,
+        enable_shell=False,
+    ):
+        """Initialize."""
+        self._process_args = process_args
+        self._chunksize = chunksize
+        self._enable_write = enable_write
+        self._enable_shell = enable_shell
+        self._exit = False
+        self._proc = None
+        self._id = int(time.time())  # some identifier for logging
+
+    async def __aenter__(self) -> "AsyncProcess":
+        """Enter context manager, start running the process in executor."""
+        LOGGER.debug("[%s] Entered context manager", self._id)
+        self._proc = subprocess.Popen(
+            self._process_args,
+            **{
+                "shell": self._enable_shell,
+                "stdout": subprocess.PIPE,
+                "stdin": subprocess.PIPE if self._enable_write else None,
+            },
+        )
+        return self
+
+    async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+        """Exit context manager."""
+        if exc_type:
+            LOGGER.debug(
+                "[%s] Context manager exit with exception %s (%s)",
+                self._id,
+                exc_type,
+                str(exc_value),
+            )
+        else:
+            LOGGER.debug("[%s] Context manager exit", self._id)
+        # signal that we must exit
+        self._exit = True
+
+        def close_proc():
+            if self._proc and self._proc.poll() is None:
+                # there is no real clean way to do this with all the blocking pipes
+                self._proc.kill()
+
+        await asyncio.get_running_loop().run_in_executor(None, close_proc)
+        LOGGER.debug("[%s] Cleanup finished", self._id)
+        return True
+
+    async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
+        """Yield chunks from the output Queue. Generator."""
+        LOGGER.debug("[%s] start reading from generator", self._id)
+        while not self._exit:
+            chunk = await self.read()
+            yield chunk
+            if len(chunk) < self._chunksize:
+                break
+        LOGGER.debug("[%s] finished reading from generator", self._id)
+
+    async def read(self) -> bytes:
+        """Read single chunk from the output Queue."""
+
+        def try_read():
+            try:
+                data = self._proc.stdout.read(self._chunksize)
+                return data
+            except BrokenPipeError:
+                return b""
+            except Exception as exc:  # pylint: disable=broad-except
+                LOGGER.exception(exc)
+                return b""
+
+        return await asyncio.get_running_loop().run_in_executor(None, try_read)
+
+    async def write(self, data: bytes) -> None:
+        """Write data to process."""
+
+        def try_write(_data):
+            try:
+                self._proc.stdin.write(_data)
+            except BrokenPipeError:
+                pass
+            except Exception as exc:  # pylint: disable=broad-except
+                LOGGER.exception(exc)
+
+        await asyncio.get_running_loop().run_in_executor(None, try_write, data)
+
+    async def write_eof(self) -> None:
+        """Write eof to process."""
+
+        def try_write():
+            try:
+                self._proc.stdin.close()
+            except BrokenPipeError:
+                pass
+            except Exception as exc:  # pylint: disable=broad-except
+                LOGGER.exception(exc)
+
+        await asyncio.get_running_loop().run_in_executor(None, try_write)
+
+    async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
+        """Write bytes to process and read back results."""
+        if not self._enable_write and input_data:
+            raise RuntimeError("Write is disabled")
+        if input_data:
+            await self.write(input_data)
+        output = b""
+        async for chunk in self.iterate_chunks():
+            output += chunk
+        return output
+
+
+# first attempt with queues, too complicated
+# left here as reference
+class AsyncProcessWithQueues(object):
+    """Implementation of a (truly) non blocking subprocess."""
+
+    def __init__(
+        self,
+        process_args: List,
+        chunksize=512000,
+        enable_write: bool = False,
+        enable_shell=False,
+    ):
+        """Initialize."""
+        self._process_args = process_args
+        self._chunksize = chunksize
+        self._enable_write = enable_write
+        self._enable_shell = enable_shell
+        # we have large chunks, limit the queue size a bit.
+        import janus
+
+        self.__queue_in = janus.Queue(8)
+        self.__queue_out = janus.Queue(4)
+        self.__proc_task = None
+        self._exit = threading.Event()
+        self._id = int(time.time())  # some identifier for logging
+
+    async def __aenter__(self) -> "AsyncProcess":
+        """Enter context manager, start running the process in executor."""
+        LOGGER.debug("[%s] Entered context manager", self._id)
+        self.__proc_task = asyncio.get_running_loop().run_in_executor(None, self._run)
+        return self
+
+    async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+        """Exit context manager."""
+        if exc_type:
+            LOGGER.debug(
+                "[%s] Context manager exit with exception %s (%s)",
+                self._id,
+                exc_type,
+                str(exc_value),
+            )
+        else:
+            LOGGER.debug("[%s] Context manager exit", self._id)
+        # signal that we must exit
+        self._exit.set()
+        # if self._proc and self._proc.poll() is None:
+        #     asyncio.get_running_loop().run_in_executor(None, self._proc.communicate)
+        print("1")
+        self.__queue_out.close()
+        self.__queue_in.close()
+        print("2")
+        await self.__queue_out.wait_closed()
+        await self.__queue_in.wait_closed()
+        print("3")
+        # await executor job
+        self.__proc_task.cancel()
+        # await self.__proc_task
+        print("4")
+
+        LOGGER.debug("[%s] Cleanup finished", self._id)
+        return True
+
+    async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
+        """Yield chunks from the output Queue. Generator."""
+        LOGGER.debug("[%s] start reading from generator", self._id)
+        while not self._exit.is_set():
+            chunk = await self.__queue_out.async_q.get()
+            self.__queue_out.async_q.task_done()
+            if not chunk:
+                break
+            yield chunk
+        LOGGER.debug("[%s] finished reading from generator", self._id)
+
+    async def read(self) -> bytes:
+        """Read single chunk from the output Queue."""
+        chunk = await self.__queue_out.async_q.get()
+        self.__queue_out.async_q.task_done()
+        return chunk
+
+    async def write(self, data: Optional[bytes] = None) -> None:
+        """Write data to process."""
+        if not self._exit.is_set():
+            await self.__queue_in.async_q.put(data)
+
+    async def write_eof(self) -> None:
+        """Write eof to process stdin."""
+        await self.write(b"")
+
+    async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
+        """Write bytes to process and read back results."""
+        if not self._enable_write and input_data:
+            raise RuntimeError("Write is disabled")
+        if input_data:
+            await self.write(input_data)
+            await self.write_eof()
+        output = b""
+        async for chunk in self.iterate_chunks():
+            output += chunk
+        return output
+
+    def _run(self):
+        """Run actual process in executor thread."""
+        LOGGER.info(
+            "[%s] Starting process with args: %s", self._id, str(self._process_args)
+        )
+        proc = subprocess.Popen(
+            self._process_args,
+            **{
+                "shell": self._enable_shell,
+                "stdout": subprocess.PIPE,
+                "stdin": subprocess.PIPE if self._enable_write else None,
+            },
+        )
+
+        # start fill buffer task in (yet another) background thread
+        def fill_buffer():
+            LOGGER.debug("[%s] start fill buffer", self._id)
+            try:
+                while not self._exit.is_set() and not self.__queue_in.closed:
+                    chunk = self.__queue_in.sync_q.get()
+                    if not chunk:
+                        break
+                    proc.stdin.write(chunk)
+            except Exception as exc:  # pylint: disable=broad-except
+                LOGGER.debug("[%s], fill buffer aborted (%s)", self._id, str(exc))
+            else:
+                LOGGER.debug("[%s] fill buffer finished", self._id)
+
+        if self._enable_write:
+            fill_buffer_thread = threading.Thread(
+                target=fill_buffer, name=f"AsyncProcess_{self._id}"
+            )
+            fill_buffer_thread.start()
+
+        # consume bytes from stdout
+        try:
+            while not self._exit.is_set() and not self.__queue_out.closed:
+                chunk = proc.stdout.read(self._chunksize)
+                self.__queue_out.sync_q.put(chunk)
+                if len(chunk) < self._chunksize:
+                    LOGGER.debug("[%s] last chunk received on stdout", self._id)
+                    break
+            if self._enable_write:
+                fill_buffer_thread.join()
+            # write empty chunk to out queue to indicate end of stream just in case
+            self.__queue_out.sync_q.put(b"")
+        finally:
+            LOGGER.info("[%s] wait for process exit", self._id)
+            # pickup remaining bytes if process is stull running
+            if proc.poll() is None:
+                proc.communicate()
index edc786898796f11aacdac7f5df3df14b523001b0..9ccdaa05ba796987b8c81f8dcf8ea922b4379c94 100755 (executable)
@@ -3,6 +3,8 @@ StreamManager: handles all audio streaming to players.
 
 Either by sending tracks one by one or send one continuous stream
 of music with crossfade/gapless support (queue stream).
+
+All audio is processed by the SoX executable, using various subprocess streams.
 """
 import asyncio
 import gc
@@ -22,6 +24,7 @@ from music_assistant.helpers.encryption import (
     async_decrypt_string,
     encrypt_bytes,
 )
+from music_assistant.helpers.process import AsyncProcess
 from music_assistant.helpers.typing import MusicAssistantType
 from music_assistant.helpers.util import (
     async_yield_chunks,
@@ -60,7 +63,7 @@ class StreamManager:
         output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
         resample: Optional[int] = None,
         gain_db_adjust: Optional[float] = None,
-        chunk_size: int = 5000000,
+        chunk_size: int = 1024000,
     ) -> AsyncGenerator[Tuple[bool, bytes], None]:
         """Get the sox manipulated audio data for the given streamdetails."""
         # collect all args for sox
@@ -81,57 +84,37 @@ class StreamManager:
             args += ["vol", str(gain_db_adjust), "dB"]
         if resample:
             args += ["rate", "-v", str(resample)]
-        if not chunk_size:
-            chunk_size = int(
-                streamdetails.sample_rate * (streamdetails.bit_depth / 8) * 2 * 10
-            )
+
         LOGGER.debug(
             "[async_get_sox_stream] [%s/%s] started using args: %s",
             streamdetails.provider,
             streamdetails.item_id,
             " ".join(args),
         )
-        # init the process with stdin/out pipes
-        sox_proc = await asyncio.create_subprocess_exec(
-            *args,
-            stdout=asyncio.subprocess.PIPE,
-            stdin=asyncio.subprocess.PIPE,
-            limit=chunk_size * 5,
-        )
+        async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
 
-        async def fill_buffer():
-            """Forward audio chunks to sox stdin."""
-            LOGGER.debug(
-                "[async_get_sox_stream] [%s/%s] fill_buffer started",
-                streamdetails.provider,
-                streamdetails.item_id,
-            )
-            # feed audio data into sox stdin for processing
-            async for chunk in self.async_get_media_stream(streamdetails):
-                if self.mass.exit:
-                    break
-                sox_proc.stdin.write(chunk)
-                await sox_proc.stdin.drain()
-            # send eof when last chunk received
-            sox_proc.stdin.write_eof()
-            await sox_proc.stdin.drain()
-            LOGGER.debug(
-                "[async_get_sox_stream] [%s/%s] fill_buffer finished",
-                streamdetails.provider,
-                streamdetails.item_id,
-            )
+            async def fill_buffer():
+                """Forward audio chunks to sox stdin."""
+                LOGGER.debug(
+                    "[async_get_sox_stream] [%s/%s] fill_buffer started",
+                    streamdetails.provider,
+                    streamdetails.item_id,
+                )
+                # feed audio data into sox stdin for processing
+                async for chunk in self.async_get_media_stream(streamdetails):
+                    await sox_proc.write(chunk)
+                await sox_proc.write_eof()
+                LOGGER.debug(
+                    "[async_get_sox_stream] [%s/%s] fill_buffer finished",
+                    streamdetails.provider,
+                    streamdetails.item_id,
+                )
 
-        fill_buffer_task = self.mass.loop.create_task(fill_buffer())
-        try:
+            fill_buffer_task = self.mass.loop.create_task(fill_buffer())
             # yield chunks from stdout
             # we keep 1 chunk behind to detect end of stream properly
             prev_chunk = b""
-            while True:
-                # read exactly chunksize of data
-                try:
-                    chunk = await sox_proc.stdout.readexactly(chunk_size)
-                except asyncio.IncompleteReadError as exc:
-                    chunk = exc.partial
+            async for chunk in sox_proc.iterate_chunks():
                 if len(chunk) < chunk_size:
                     # last chunk
                     yield (True, prev_chunk + chunk)
@@ -142,19 +125,6 @@ class StreamManager:
 
             await asyncio.wait([fill_buffer_task])
 
-        except (GeneratorExit, Exception):  # pylint: disable=broad-except
-            LOGGER.warning(
-                "[async_get_sox_stream] [%s/%s] aborted",
-                streamdetails.provider,
-                streamdetails.item_id,
-            )
-            if fill_buffer_task and not fill_buffer_task.cancelled():
-                fill_buffer_task.cancel()
-            await sox_proc.communicate()
-            if sox_proc and sox_proc.returncode is None:
-                sox_proc.terminate()
-                await sox_proc.wait()
-        else:
             LOGGER.debug(
                 "[async_get_sox_stream] [%s/%s] finished",
                 streamdetails.provider,
@@ -166,56 +136,35 @@ class StreamManager:
         chunk_size = 571392  # 74,7% of pcm
 
         args = ["sox", "-t", "s32", "-c", "2", "-r", "96000", "-", "-t", "flac", "-"]
-        sox_proc = await asyncio.create_subprocess_exec(
-            *args,
-            stdout=asyncio.subprocess.PIPE,
-            stdin=asyncio.subprocess.PIPE,
-            limit=chunk_size,
-        )
-        LOGGER.debug(
-            "[async_queue_stream_flac] [%s] started using args: %s",
-            player_id,
-            " ".join(args),
-        )
+        async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
 
-        # feed stdin with pcm samples
-        async def fill_buffer():
-            """Feed audio data into sox stdin for processing."""
             LOGGER.debug(
-                "[async_queue_stream_flac] [%s] fill buffer started", player_id
-            )
-            async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32):
-                if self.mass.exit:
-                    return
-                sox_proc.stdin.write(chunk)
-                await sox_proc.stdin.drain()
-            # write eof when no more data
-            sox_proc.stdin.write_eof()
-            await sox_proc.stdin.drain()
-            LOGGER.debug(
-                "[async_queue_stream_flac] [%s] fill buffer finished", player_id
+                "[async_queue_stream_flac] [%s] started using args: %s",
+                player_id,
+                " ".join(args),
             )
 
-        fill_buffer_task = self.mass.loop.create_task(fill_buffer())
-        try:
-            # yield flac chunks from stdout
-            while True:
-                try:
-                    chunk = await sox_proc.stdout.readexactly(chunk_size)
-                    yield chunk
-                except asyncio.IncompleteReadError as exc:
-                    chunk = exc.partial
-                    yield chunk
-                    break
-        except (GeneratorExit, Exception):  # pylint: disable=broad-except
-            LOGGER.debug("[async_queue_stream_flac] [%s] aborted", player_id)
-            if fill_buffer_task and not fill_buffer_task.cancelled():
-                fill_buffer_task.cancel()
-            await sox_proc.communicate()
-            if sox_proc and sox_proc.returncode is None:
-                sox_proc.terminate()
-                await sox_proc.wait()
-        else:
+            # feed stdin with pcm samples
+            async def fill_buffer():
+                """Feed audio data into sox stdin for processing."""
+                LOGGER.debug(
+                    "[async_queue_stream_flac] [%s] fill buffer started", player_id
+                )
+                async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32):
+                    if self.mass.exit:
+                        return
+                    await sox_proc.write(chunk)
+                # write eof when no more data
+                await sox_proc.write_eof()
+                LOGGER.debug(
+                    "[async_queue_stream_flac] [%s] fill buffer finished", player_id
+                )
+
+            fill_buffer_task = self.mass.loop.create_task(fill_buffer())
+            # start yielding audio chunks
+            async for chunk in sox_proc.iterate_chunks():
+                yield chunk
+            await asyncio.wait([fill_buffer_task])
             LOGGER.debug(
                 "[async_queue_stream_flac] [%s] finished",
                 player_id,
@@ -418,6 +367,7 @@ class StreamManager:
         stream_path = await async_decrypt_string(streamdetails.path)
         stream_type = StreamType(streamdetails.type)
         audio_data = b""
+        chunk_size = 512000
 
         # Handle (optional) caching of audio data
         cache_id = f"{streamdetails.item_id}{streamdetails.provider}"[::-1]
@@ -445,53 +395,30 @@ class StreamManager:
         )
 
         if stream_type == StreamType.CACHE:
-            async for chunk in async_yield_chunks(audio_data, 512000):
+            async for chunk in async_yield_chunks(audio_data, chunk_size):
                 yield chunk
         elif stream_type == StreamType.URL:
             async with self.mass.http_session.get(stream_path) as response:
-                async for chunk in response.content.iter_any():
+                while True:
+                    chunk = await response.content.read(chunk_size)
+                    if not chunk:
+                        break
                     yield chunk
                     if len(audio_data) < 100000000:
                         audio_data += chunk
         elif stream_type == StreamType.FILE:
             async with AIOFile(stream_path) as afp:
-                async for chunk in Reader(afp):
+                async for chunk in Reader(afp, chunk_size=chunk_size):
                     yield chunk
                     if len(audio_data) < 100000000:
                         audio_data += chunk
         elif stream_type == StreamType.EXECUTABLE:
             args = shlex.split(stream_path)
-            chunk_size = 512000
-            process = await asyncio.create_subprocess_exec(
-                *args, stdout=asyncio.subprocess.PIPE, limit=chunk_size
-            )
-            try:
-                while True:
-                    # read exactly chunksize of data
-                    try:
-                        chunk = await process.stdout.readexactly(chunk_size)
-                    except asyncio.IncompleteReadError as exc:
-                        chunk = exc.partial
+            async with AsyncProcess(args, chunk_size, False) as process:
+                async for chunk in process.iterate_chunks():
                     yield chunk
                     if len(audio_data) < 100000000:
                         audio_data += chunk
-                    if len(chunk) < chunk_size:
-                        # last chunk
-                        break
-            except (GeneratorExit, Exception) as exc:  # pylint: disable=broad-except
-                LOGGER.warning(
-                    "[async_get_media_stream] [%s/%s] Aborted: %s",
-                    streamdetails.provider,
-                    streamdetails.item_id,
-                    str(exc),
-                )
-                # read remaining bytes
-                process.terminate()
-                await process.communicate()
-                if process and process.returncode is None:
-                    process.terminate()
-                    await process.wait()
-                raise GeneratorExit from exc
 
         # signal end of stream event
         self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)
@@ -578,32 +505,20 @@ async def async_crossfade_pcm_parts(
     fadeinfile = create_tempfile()
     args = ["sox", "--ignore-length", "-t"] + pcm_args
     args += ["-", "-t"] + pcm_args + [fadeinfile.name, "fade", "t", str(fade_length)]
-    process = await asyncio.create_subprocess_exec(
-        *args, stdin=asyncio.subprocess.PIPE, limit=10000000
-    )
-    await process.communicate(fade_in_part)
+    async with AsyncProcess(args, enable_write=True) as sox_proc:
+        await sox_proc.communicate(fade_in_part)
     # create fade-out part
     fadeoutfile = create_tempfile()
     args = ["sox", "--ignore-length", "-t"] + pcm_args + ["-", "-t"] + pcm_args
     args += [fadeoutfile.name, "reverse", "fade", "t", str(fade_length), "reverse"]
-    process = await asyncio.create_subprocess_exec(
-        *args,
-        stdout=asyncio.subprocess.PIPE,
-        stdin=asyncio.subprocess.PIPE,
-        limit=10000000,
-    )
-    await process.communicate(fade_out_part)
+    async with AsyncProcess(args, enable_write=True) as sox_proc:
+        await sox_proc.communicate(fade_out_part)
     # create crossfade using sox and some temp files
     # TODO: figure out how to make this less complex and without the tempfiles
     args = ["sox", "-m", "-v", "1.0", "-t"] + pcm_args + [fadeoutfile.name, "-v", "1.0"]
     args += ["-t"] + pcm_args + [fadeinfile.name, "-t"] + pcm_args + ["-"]
-    process = await asyncio.create_subprocess_exec(
-        *args,
-        stdout=asyncio.subprocess.PIPE,
-        stdin=asyncio.subprocess.PIPE,
-        limit=10000000,
-    )
-    crossfade_part, _ = await process.communicate()
+    async with AsyncProcess(args, enable_write=False) as sox_proc:
+        crossfade_part = await sox_proc.communicate()
     fadeinfile.close()
     fadeoutfile.close()
     del fadeinfile
@@ -621,11 +536,6 @@ async def async_strip_silence(
     args += ["silence", "1", "0.1", "1%"]
     if reverse:
         args.append("reverse")
-    process = await asyncio.create_subprocess_exec(
-        *args,
-        stdin=asyncio.subprocess.PIPE,
-        stdout=asyncio.subprocess.PIPE,
-        limit=10000000,
-    )
-    stripped_data, _ = await process.communicate(audio_data)
+    async with AsyncProcess(args, enable_write=True) as sox_proc:
+        stripped_data = await sox_proc.communicate(audio_data)
     return stripped_data
index c5596df1eedbffe70d1771d10e94dcf07b23281c..968de855b19d53de7a051c702a9b3b6a414293da 100644 (file)
@@ -192,7 +192,9 @@ class ChromecastPlayer(Player):
     def set_cast_info(self, cast_info: ChromecastInfo) -> None:
         """Set the cast information and set up the chromecast object."""
         self._cast_info = cast_info
-        if self._chromecast is not None:
+        if self._chromecast and not self._chromecast.socket_client.is_connected:
+            self.disconnect()
+        elif self._chromecast is not None:
             return
         LOGGER.debug(
             "[%s] Connecting to cast device by service %s",
@@ -293,7 +295,9 @@ class ChromecastPlayer(Player):
             self._available = new_available
             self.update_state()
             if self._cast_info.is_audio_group and new_available:
-                self.mass.add_job(self._chromecast.mz_controller.update_members)
+                self.try_chromecast_command(
+                    self._chromecast.mz_controller.update_members
+                )
 
     async def async_on_update(self) -> None:
         """Call when player is periodically polled by the player manager (should_poll=True)."""
@@ -301,7 +305,7 @@ class ChromecastPlayer(Player):
             "group_player"
         ):
             # the group player wants very accurate elapsed_time state so we request it very often
-            self.mass.add_job(self._chromecast.media_controller.update_status)
+            self.try_chromecast_command(self._chromecast.media_controller.update_status)
         self.update_state()
 
     # ========== Service Calls ==========
@@ -309,31 +313,31 @@ class ChromecastPlayer(Player):
     async def async_cmd_stop(self) -> None:
         """Send stop command to player."""
         if self._chromecast and self._chromecast.media_controller:
-            self.mass.add_job(self._chromecast.media_controller.stop)
+            self.try_chromecast_command(self._chromecast.media_controller.stop)
 
     async def async_cmd_play(self) -> None:
         """Send play command to player."""
         if self._chromecast.media_controller:
-            self.mass.add_job(self._chromecast.media_controller.play)
+            self.try_chromecast_command(self._chromecast.media_controller.play)
 
     async def async_cmd_pause(self) -> None:
         """Send pause command to player."""
         if self._chromecast.media_controller:
-            self.mass.add_job(self._chromecast.media_controller.pause)
+            self.try_chromecast_command(self._chromecast.media_controller.pause)
 
     async def async_cmd_next(self) -> None:
         """Send next track command to player."""
         if self._chromecast.media_controller:
-            self.mass.add_job(self._chromecast.media_controller.queue_next)
+            self.try_chromecast_command(self._chromecast.media_controller.queue_next)
 
     async def async_cmd_previous(self) -> None:
         """Send previous track command to player."""
         if self._chromecast.media_controller:
-            self.mass.add_job(self._chromecast.media_controller.queue_prev)
+            self.try_chromecast_command(self._chromecast.media_controller.queue_prev)
 
     async def async_cmd_power_on(self) -> None:
         """Send power ON command to player."""
-        self.mass.add_job(self._chromecast.set_volume_muted, False)
+        self.try_chromecast_command(self._chromecast.set_volume_muted, False)
 
     async def async_cmd_power_off(self) -> None:
         """Send power OFF command to player."""
@@ -342,17 +346,17 @@ class ChromecastPlayer(Player):
             or self.media_status.player_is_paused
             or self.media_status.player_is_idle
         ):
-            self.mass.add_job(self._chromecast.media_controller.stop)
+            self.try_chromecast_command(self._chromecast.media_controller.stop)
         # chromecast has no real poweroff so we send mute instead
-        self.mass.add_job(self._chromecast.set_volume_muted, True)
+        self.try_chromecast_command(self._chromecast.set_volume_muted, True)
 
     async def async_cmd_volume_set(self, volume_level: int) -> None:
         """Send new volume level command to player."""
-        self.mass.add_job(self._chromecast.set_volume, volume_level / 100)
+        self.try_chromecast_command(self._chromecast.set_volume, volume_level / 100)
 
     async def async_cmd_volume_mute(self, is_muted: bool = False) -> None:
         """Send mute command to player."""
-        self.mass.add_job(self._chromecast.set_volume_muted, is_muted)
+        self.try_chromecast_command(self._chromecast.set_volume_muted, is_muted)
 
     async def async_cmd_play_uri(self, uri: str) -> None:
         """Play single uri on player."""
@@ -363,7 +367,7 @@ class ChromecastPlayer(Player):
             queue_item.name = "Music Assistant"
             queue_item.uri = uri
             return await self.async_cmd_queue_load([queue_item, queue_item])
-        self.mass.add_job(self._chromecast.play_media, uri, "audio/flac")
+        self.try_chromecast_command(self._chromecast.play_media, uri, "audio/flac")
 
     async def async_cmd_queue_load(self, queue_items: List[QueueItem]) -> None:
         """Load (overwrite) queue with new items."""
@@ -378,7 +382,7 @@ class ChromecastPlayer(Player):
             "startIndex": 0,  # Item index to play after this request or keep same item if undefined
             "items": cc_queue_items,  # only load 50 tracks at once or the socket will crash
         }
-        self.mass.add_job(self.__send_player_queue, queuedata)
+        self.try_chromecast_command(self.__send_player_queue, queuedata)
         if len(queue_items) > 50:
             await self.async_cmd_queue_append(queue_items[51:])
 
@@ -391,7 +395,7 @@ class ChromecastPlayer(Player):
                 "insertBefore": None,
                 "items": chunk,
             }
-            self.mass.add_job(self.__send_player_queue, queuedata)
+            self.try_chromecast_command(self.__send_player_queue, queuedata)
 
     def __create_queue_items(self, tracks) -> None:
         """Create list of CC queue items from tracks."""
@@ -446,3 +450,20 @@ class ChromecastPlayer(Player):
             )
         else:
             send_queue()
+
+    def try_chromecast_command(self, func, *args, **kwargs):
+        """Try to execute Chromecast command."""
+
+        def handle_command(func, *args, **kwarg):
+            if not self._chromecast.socket_client.is_connected:
+                return
+            try:
+                return func(*args, **kwargs)
+            except Exception as exc:  # pylint: disable=broad-except
+                LOGGER.error(
+                    "Error while executing command on player %s: %s",
+                    self.name,
+                    str(exc),
+                )
+
+        self.mass.add_job(handle_command, func, *args, **kwargs)
index 1f74e2295e3acf4582d6bd33c915b49d15c6c96b..a0dec967977a11295fe2eff6a7942f14c8d48850 100644 (file)
@@ -318,32 +318,11 @@ class GroupPlayer(Player):
     async def subscribe_stream_client(self, child_player_id):
         """Handle streaming to all players of a group. Highly experimental."""
 
-        # each connected client gets its own sox process to convert incoming pcm samples
-        # to flac (which is streamed to the player).
-        args = [
-            "sox",
-            "-t",
-            "s32",
-            "-c",
-            "2",
-            "-r",
-            "96000",
-            "-",
-            "-t",
-            "flac",
-            "-C",
-            "0",
-            "-",
-        ]
-        sox_proc = await asyncio.create_subprocess_exec(
-            *args,
-            stdout=asyncio.subprocess.PIPE,
-            stdin=asyncio.subprocess.PIPE,
-        )
-        chunk_size = 2880000  # roughly 5 seconds of flac @ 96000/32
+        # each connected client gets its own Queue to which audio chunks (flac) are sent
         try:
             # report this client as connected
-            self.connected_clients[child_player_id] = sox_proc.stdin
+            queue = asyncio.Queue()
+            self.connected_clients[child_player_id] = queue
             LOGGER.debug(
                 "[%s] child player connected: %s",
                 self.player_id,
@@ -351,22 +330,16 @@ class GroupPlayer(Player):
             )
             # yield flac chunks from stdout to the http streamresponse
             while True:
-                try:
-                    chunk = await sox_proc.stdout.readexactly(chunk_size)
-                    yield chunk
-                except asyncio.IncompleteReadError as exc:
-                    chunk = exc.partial
-                    yield chunk
+                chunk = await queue.get()
+                yield chunk
+                queue.task_done()
+                if not chunk:
                     break
         except (GeneratorExit, Exception):  # pylint: disable=broad-except
             LOGGER.warning(
                 "[%s] child player aborted stream: %s", self.player_id, child_player_id
             )
             self.connected_clients.pop(child_player_id, None)
-            await sox_proc.communicate()
-            if sox_proc and sox_proc.returncode is None:
-                sox_proc.terminate()
-                await sox_proc.wait()
         else:
             self.connected_clients.pop(child_player_id, None)
             LOGGER.debug(
@@ -390,8 +363,8 @@ class GroupPlayer(Player):
         )
         self.sync_task = asyncio.create_task(self.__synchronize_players())
 
-        async for audio_chunk in self.mass.streams.async_queue_stream_pcm(
-            self.player_id, sample_rate=96000, bit_depth=32
+        async for audio_chunk in self.mass.streams.async_queue_stream_flac(
+            self.player_id
         ):
 
             # make sure we still have clients connected
@@ -401,9 +374,8 @@ class GroupPlayer(Player):
 
             # send the audio chunk to all connected players
             tasks = []
-            for _writer in self.connected_clients.values():
-                tasks.append(self.mass.add_job(_writer.write, audio_chunk))
-                tasks.append(self.mass.add_job(_writer.drain()))
+            for _queue in self.connected_clients.values():
+                tasks.append(self.mass.add_job(_queue.put(audio_chunk)))
             # wait for clients to consume the data
             await asyncio.wait(tasks)
 
index 380b14c9ec3d1572951f7b188bcfdcb46e9413db..88c271c8b1dedc3a1fdbe57e3a44174a948a1a19 100644 (file)
@@ -451,7 +451,7 @@ class SqueezeSocketClient:
         """Process incoming stat STMu message: Buffer underrun: Normal end of playback."""
         # pylint: disable=unused-argument
         LOGGER.debug("STMu received - end of playback.")
-        self.state = STATE_STOPPED
+        self._state = STATE_STOPPED
         self.signal_event(SqueezeEvent.STATE_UPDATED)
 
     def _process_resp(self, data):
index 54c0b94d992b4f87177377198ce0ac08c6ede4cd..d62307ed2bc73f3c2f873a65198b07d8efe36026 100644 (file)
@@ -25,7 +25,6 @@ async def stream_media(request: Request):
     resp = StreamResponse(
         status=200, reason="OK", headers={"Content-Type": f"audio/{content_type}"}
     )
-    resp.enable_chunked_encoding()
     await resp.prepare(request)
 
     # stream track
@@ -70,7 +69,6 @@ async def stream_queue_item(request: Request):
     resp = StreamResponse(
         status=200, reason="OK", headers={"Content-Type": "audio/flac"}
     )
-    resp.enable_chunked_encoding()
     await resp.prepare(request)
 
     async for audio_chunk in request.app["mass"].streams.async_stream_queue_item(
@@ -93,7 +91,6 @@ async def stream_group(request: Request):
     resp = StreamResponse(
         status=200, reason="OK", headers={"Content-Type": "audio/flac"}
     )
-    resp.enable_chunked_encoding()
     await resp.prepare(request)
 
     # stream queue