fix playback on squeezebox players
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 22 Dec 2020 20:50:32 +0000 (21:50 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 22 Dec 2020 20:50:32 +0000 (21:50 +0100)
a larger buffer is required so we can buffer ahead

music_assistant/constants.py
music_assistant/helpers/process.py
music_assistant/managers/streams.py

index 930e0c1e6aa5bc48a185126ba4c6915aae34870c..889648841a0d16bc58e0ba500455195a14dc333c 100755 (executable)
@@ -1,6 +1,6 @@
 """All constants for Music Assistant."""
 
-__version__ = "0.0.76"
+__version__ = "0.0.77"
 REQUIRED_PYTHON_VER = "3.7"
 
 # configuration keys/attributes
index f50deb303ed5cc19977925a674a57e54dec1762d..49a3727bbce799d239db34ec6d6dd439c4fc3975 100644 (file)
@@ -18,10 +18,14 @@ from typing import AsyncGenerator, List, Optional
 
 LOGGER = logging.getLogger("AsyncProcess")
 
+DEFAULT_CHUNKSIZE = 1000000
+
 
 class AsyncProcess:
     """Implementation of a (truly) non blocking subprocess."""
 
+    # workaround that is compatible with uvloop
+
     def __init__(
         self,
         process_args: List,
@@ -34,6 +38,8 @@ class AsyncProcess:
             shell=enable_shell,
             stdout=subprocess.PIPE,
             stdin=subprocess.PIPE if enable_write else None,
+            # bufsize needs to be very high for smooth playback
+            bufsize=64000000,
         )
         self.loop = asyncio.get_running_loop()
         self._cancelled = False
@@ -47,12 +53,13 @@ class AsyncProcess:
         self._cancelled = True
         if await self.loop.run_in_executor(None, self._proc.poll) is None:
             # prevent subprocess deadlocking, send terminate and read remaining bytes
-            await self.loop.run_in_executor(None, self._proc.kill)
-            self.loop.run_in_executor(None, self.__read)
+            await self.loop.run_in_executor(None, self._proc.terminate)
+            await self.loop.run_in_executor(None, self.__read)
+        LOGGER.debug("process finished")
         del self._proc
 
     async def iterate_chunks(
-        self, chunksize: int = 512000
+        self, chunksize: int = DEFAULT_CHUNKSIZE
     ) -> AsyncGenerator[bytes, None]:
         """Yield chunks from the process stdout. Generator."""
         while True:
@@ -61,13 +68,13 @@ class AsyncProcess:
                 break
             yield chunk
 
-    async def read(self, chunksize: int = -1) -> bytes:
+    async def read(self, chunksize: int = DEFAULT_CHUNKSIZE) -> bytes:
         """Read x bytes from the process stdout."""
         if self._cancelled:
             raise asyncio.CancelledError()
         return await self.loop.run_in_executor(None, self.__read, chunksize)
 
-    def __read(self, chunksize: int = -1):
+    def __read(self, chunksize: int = DEFAULT_CHUNKSIZE):
         """Try read chunk from process."""
         try:
             return self._proc.stdout.read(chunksize)
@@ -111,3 +118,78 @@ class AsyncProcess:
             None, self._proc.communicate, input_data
         )
         return stdout
+
+
+class AsyncProcessBroken:
+    """Implementation of a (truly) non blocking subprocess."""
+
+    # this version is not compatible with uvloop
+
+    def __init__(self, process_args: List, enable_write: bool = False):
+        """Initialize."""
+        self._proc = None
+        self._process_args = process_args
+        self._enable_write = enable_write
+        self._cancelled = False
+
+    async def __aenter__(self) -> "AsyncProcess":
+        """Enter context manager."""
+        self._proc = await asyncio.create_subprocess_exec(
+            *self._process_args,
+            stdin=asyncio.subprocess.PIPE if self._enable_write else None,
+            stdout=asyncio.subprocess.PIPE,
+            limit=64000000
+        )
+        return self
+
+    async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+        """Exit context manager."""
+        self._cancelled = True
+        LOGGER.debug("subprocess exit requested")
+        if self._proc.returncode is None:
+            # prevent subprocess deadlocking, send terminate and read remaining bytes
+            if self._enable_write and self._proc.stdin.can_write_eof():
+                self._proc.stdin.write_eof()
+            self._proc.terminate()
+            await self._proc.stdout.read()
+        del self._proc
+        LOGGER.debug("subprocess exited")
+
+    async def iterate_chunks(
+        self, chunk_size: int = DEFAULT_CHUNKSIZE
+    ) -> AsyncGenerator[bytes, None]:
+        """Yield chunks from the process stdout. Generator."""
+        while True:
+            chunk = await self.read(chunk_size)
+            yield chunk
+            if len(chunk) < chunk_size:
+                break
+
+    async def read(self, chunk_size: int = DEFAULT_CHUNKSIZE) -> bytes:
+        """Read x bytes from the process stdout."""
+        if self._cancelled:
+            raise asyncio.CancelledError()
+        try:
+            return await self._proc.stdout.readexactly(chunk_size)
+        except asyncio.IncompleteReadError as err:
+            return err.partial
+
+    async def write(self, data: bytes) -> None:
+        """Write data to process stdin."""
+        if self._cancelled:
+            raise asyncio.CancelledError()
+        self._proc.stdin.write(data)
+        await self._proc.stdin.drain()
+
+    async def write_eof(self) -> None:
+        """Write eof to process."""
+        if self._cancelled:
+            raise asyncio.CancelledError()
+        if self._proc.stdin.can_write_eof():
+            self._proc.stdin.write_eof()
+
+    async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
+        """Write bytes to process and read back results."""
+        if self._cancelled:
+            raise asyncio.CancelledError()
+        return await self._proc.communicate(input_data)
index 09141af8fd8ca0f40367acff265172746bb637a7..ebbb91bf2f294a628b4574b90b6304d3c9a599c4 100755 (executable)
@@ -54,7 +54,7 @@ class StreamManager:
         output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
         resample: Optional[int] = None,
         gain_db_adjust: Optional[float] = None,
-        chunk_size: int = 512000,
+        chunk_size: int = 1000000,
     ) -> AsyncGenerator[Tuple[bool, bytes], None]:
         """Get the sox manipulated audio data for the given streamdetails."""
         # collect all args for sox
@@ -92,6 +92,7 @@ class StreamManager:
                 await sox_proc.write_eof()
 
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())
+            await asyncio.sleep(1)
             # yield chunks from stdout
             # we keep 1 chunk behind to detect end of stream properly
             prev_chunk = b""
@@ -114,7 +115,6 @@ class StreamManager:
         """Stream the PlayerQueue's tracks as constant feed in flac format."""
         player_conf = self.mass.config.get_player_config(player_id)
         sample_rate = player_conf.get(CONF_MAX_SAMPLE_RATE, 96000)
-        chunk_size = sample_rate * 2 * 10
 
         args = [
             "sox",
@@ -144,7 +144,7 @@ class StreamManager:
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())
 
             # start yielding audio chunks
-            async for chunk in sox_proc.iterate_chunks(chunk_size):
+            async for chunk in sox_proc.iterate_chunks(8000000):
                 yield chunk
             await asyncio.wait([fill_buffer_task])
 
@@ -341,7 +341,7 @@ class StreamManager:
         # start streaming
         LOGGER.debug("Start streaming %s (%s)", queue_item_id, queue_item.name)
         async for _, audio_chunk in self.async_get_sox_stream(
-            streamdetails, gain_db_adjust=gain_correct
+            streamdetails, gain_db_adjust=gain_correct, chunk_size=8000000
         ):
             yield audio_chunk
         LOGGER.debug("Finished streaming %s (%s)", queue_item_id, queue_item.name)
@@ -353,7 +353,6 @@ class StreamManager:
         stream_path = streamdetails.path
         stream_type = StreamType(streamdetails.type)
         audio_data = b""
-        chunk_size = 512000
         track_loudness = await self.mass.database.async_get_track_loudness(
             streamdetails.item_id, streamdetails.provider
         )
@@ -373,27 +372,25 @@ class StreamManager:
             streamdetails.item_id,
             streamdetails.type,
         )
-
+        # stream from URL
         if stream_type == StreamType.URL:
             async with self.mass.http_session.get(stream_path) as response:
-                async for chunk, _ in response.content.iter_chunks():
+                async for chunk in response.content.iter_chunks():
                     yield chunk
                     if needs_analyze and len(audio_data) < 100000000:
                         audio_data += chunk
+        # stream from file
         elif stream_type == StreamType.FILE:
             async with AIOFile(stream_path) as afp:
-                async for chunk in Reader(afp, chunk_size=chunk_size):
-                    if not chunk:
-                        break
+                async for chunk in Reader(afp):
                     yield chunk
                     if needs_analyze and len(audio_data) < 100000000:
                         audio_data += chunk
+        # stream from executable's stdout
         elif stream_type == StreamType.EXECUTABLE:
             args = shlex.split(stream_path)
             async with AsyncProcess(args) as process:
-                async for chunk in process.iterate_chunks(chunk_size):
-                    if not chunk:
-                        break
+                async for chunk in process.iterate_chunks():
                     yield chunk
                     if needs_analyze and len(audio_data) < 100000000:
                         audio_data += chunk
@@ -407,6 +404,8 @@ class StreamManager:
         )
 
         # send analyze job to background worker
+        # TODO: feed audio chunks to analyzer while streaming
+        # so we don't have to load this large chunk in memory
         if needs_analyze and audio_data:
             self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
 
@@ -466,7 +465,7 @@ async def async_crossfade_pcm_parts(
     args = ["sox", "-m", "-v", "1.0", "-t"] + pcm_args + [fadeoutfile.name, "-v", "1.0"]
     args += ["-t"] + pcm_args + [fadeinfile.name, "-t"] + pcm_args + ["-"]
     async with AsyncProcess(args, enable_write=False) as sox_proc:
-        crossfade_part = await sox_proc.communicate()
+        crossfade_part, _ = await sox_proc.communicate()
     fadeinfile.close()
     fadeoutfile.close()
     del fadeinfile
@@ -485,5 +484,5 @@ async def async_strip_silence(
     if reverse:
         args.append("reverse")
     async with AsyncProcess(args, enable_write=True) as sox_proc:
-        stripped_data = await sox_proc.communicate(audio_data)
+        stripped_data, _ = await sox_proc.communicate(audio_data)
     return stripped_data