From 8d6452977980d5331dd5f5f155686ec825f08ad7 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 22 Dec 2020 21:50:32 +0100 Subject: [PATCH] fix playback on squeezebox players a larger buffer is required so we can buffer ahead --- music_assistant/constants.py | 2 +- music_assistant/helpers/process.py | 92 +++++++++++++++++++++++++++-- music_assistant/managers/streams.py | 29 +++++---- 3 files changed, 102 insertions(+), 21 deletions(-) diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 930e0c1e..88964884 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -1,6 +1,6 @@ """All constants for Music Assistant.""" -__version__ = "0.0.76" +__version__ = "0.0.77" REQUIRED_PYTHON_VER = "3.7" # configuration keys/attributes diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index f50deb30..49a3727b 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -18,10 +18,14 @@ from typing import AsyncGenerator, List, Optional LOGGER = logging.getLogger("AsyncProcess") +DEFAULT_CHUNKSIZE = 1000000 + class AsyncProcess: """Implementation of a (truly) non blocking subprocess.""" + # workaround that is compatible with uvloop + def __init__( self, process_args: List, @@ -34,6 +38,8 @@ class AsyncProcess: shell=enable_shell, stdout=subprocess.PIPE, stdin=subprocess.PIPE if enable_write else None, + # bufsize needs to be very high for smooth playback + bufsize=64000000, ) self.loop = asyncio.get_running_loop() self._cancelled = False @@ -47,12 +53,13 @@ class AsyncProcess: self._cancelled = True if await self.loop.run_in_executor(None, self._proc.poll) is None: # prevent subprocess deadlocking, send terminate and read remaining bytes - await self.loop.run_in_executor(None, self._proc.kill) - self.loop.run_in_executor(None, self.__read) + await self.loop.run_in_executor(None, self._proc.terminate) + await self.loop.run_in_executor(None, self.__read) + LOGGER.debug("process finished") del self._proc async def iterate_chunks( - self, chunksize: int = 512000 + self, chunksize: int = DEFAULT_CHUNKSIZE ) -> AsyncGenerator[bytes, None]: """Yield chunks from the process stdout. Generator.""" while True: @@ -61,13 +68,13 @@ class AsyncProcess: break yield chunk - async def read(self, chunksize: int = -1) -> bytes: + async def read(self, chunksize: int = DEFAULT_CHUNKSIZE) -> bytes: """Read x bytes from the process stdout.""" if self._cancelled: raise asyncio.CancelledError() return await self.loop.run_in_executor(None, self.__read, chunksize) - def __read(self, chunksize: int = -1): + def __read(self, chunksize: int = DEFAULT_CHUNKSIZE): """Try read chunk from process.""" try: return self._proc.stdout.read(chunksize) @@ -111,3 +118,78 @@ class AsyncProcess: None, self._proc.communicate, input_data ) return stdout + + +class AsyncProcessBroken: + """Implementation of a (truly) non blocking subprocess.""" + + # this version is not compatible with uvloop + + def __init__(self, process_args: List, enable_write: bool = False): + """Initialize.""" + self._proc = None + self._process_args = process_args + self._enable_write = enable_write + self._cancelled = False + + async def __aenter__(self) -> "AsyncProcess": + """Enter context manager.""" + self._proc = await asyncio.create_subprocess_exec( + *self._process_args, + stdin=asyncio.subprocess.PIPE if self._enable_write else None, + stdout=asyncio.subprocess.PIPE, + limit=64000000 + ) + return self + + async def __aexit__(self, exc_type, exc_value, traceback) -> bool: + """Exit context manager.""" + self._cancelled = True + LOGGER.debug("subprocess exit requested") + if self._proc.returncode is None: + # prevent subprocess deadlocking, send terminate and read remaining bytes + if self._enable_write and self._proc.stdin.can_write_eof(): + self._proc.stdin.write_eof() + self._proc.terminate() + await self._proc.stdout.read() + del self._proc + LOGGER.debug("subprocess exited") + + async def iterate_chunks( + self, chunk_size: int = DEFAULT_CHUNKSIZE + ) -> AsyncGenerator[bytes, None]: + """Yield chunks from the process stdout. Generator.""" + while True: + chunk = await self.read(chunk_size) + yield chunk + if len(chunk) < chunk_size: + break + + async def read(self, chunk_size: int = DEFAULT_CHUNKSIZE) -> bytes: + """Read x bytes from the process stdout.""" + if self._cancelled: + raise asyncio.CancelledError() + try: + return await self._proc.stdout.readexactly(chunk_size) + except asyncio.IncompleteReadError as err: + return err.partial + + async def write(self, data: bytes) -> None: + """Write data to process stdin.""" + if self._cancelled: + raise asyncio.CancelledError() + self._proc.stdin.write(data) + await self._proc.stdin.drain() + + async def write_eof(self) -> None: + """Write eof to process.""" + if self._cancelled: + raise asyncio.CancelledError() + if self._proc.stdin.can_write_eof(): + self._proc.stdin.write_eof() + + async def communicate(self, input_data: Optional[bytes] = None) -> bytes: + """Write bytes to process and read back results.""" + if self._cancelled: + raise asyncio.CancelledError() + return await self._proc.communicate(input_data) diff --git a/music_assistant/managers/streams.py b/music_assistant/managers/streams.py index 09141af8..ebbb91bf 100755 --- a/music_assistant/managers/streams.py +++ b/music_assistant/managers/streams.py @@ -54,7 +54,7 @@ class StreamManager: output_format: SoxOutputFormat = SoxOutputFormat.FLAC, resample: Optional[int] = None, gain_db_adjust: Optional[float] = None, - chunk_size: int = 512000, + chunk_size: int = 1000000, ) -> AsyncGenerator[Tuple[bool, bytes], None]: """Get the sox manipulated audio data for the given streamdetails.""" # collect all args for sox @@ -92,6 +92,7 @@ class StreamManager: await sox_proc.write_eof() fill_buffer_task = self.mass.loop.create_task(fill_buffer()) + await asyncio.sleep(1) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly prev_chunk = b"" @@ -114,7 +115,6 @@ class StreamManager: """Stream the PlayerQueue's tracks as constant feed in flac format.""" player_conf = self.mass.config.get_player_config(player_id) sample_rate = player_conf.get(CONF_MAX_SAMPLE_RATE, 96000) - chunk_size = sample_rate * 2 * 10 args = [ "sox", @@ -144,7 +144,7 @@ class StreamManager: fill_buffer_task = self.mass.loop.create_task(fill_buffer()) # start yielding audio chunks - async for chunk in sox_proc.iterate_chunks(chunk_size): + async for chunk in sox_proc.iterate_chunks(8000000): yield chunk await asyncio.wait([fill_buffer_task]) @@ -341,7 +341,7 @@ class StreamManager: # start streaming LOGGER.debug("Start streaming %s (%s)", queue_item_id, queue_item.name) async for _, audio_chunk in self.async_get_sox_stream( - streamdetails, gain_db_adjust=gain_correct + streamdetails, gain_db_adjust=gain_correct, chunk_size=8000000 ): yield audio_chunk LOGGER.debug("Finished streaming %s (%s)", queue_item_id, queue_item.name) @@ -353,7 +353,6 @@ class StreamManager: stream_path = streamdetails.path stream_type = StreamType(streamdetails.type) audio_data = b"" - chunk_size = 512000 track_loudness = await self.mass.database.async_get_track_loudness( streamdetails.item_id, streamdetails.provider ) @@ -373,27 +372,25 @@ class StreamManager: streamdetails.item_id, streamdetails.type, ) - + # stream from URL if stream_type == StreamType.URL: async with self.mass.http_session.get(stream_path) as response: - async for chunk, _ in response.content.iter_chunks(): + async for chunk in response.content.iter_chunks(): yield chunk if needs_analyze and len(audio_data) < 100000000: audio_data += chunk + # stream from file elif stream_type == StreamType.FILE: async with AIOFile(stream_path) as afp: - async for chunk in Reader(afp, chunk_size=chunk_size): - if not chunk: - break + async for chunk in Reader(afp): yield chunk if needs_analyze and len(audio_data) < 100000000: audio_data += chunk + # stream from executable's stdout elif stream_type == StreamType.EXECUTABLE: args = shlex.split(stream_path) async with AsyncProcess(args) as process: - async for chunk in process.iterate_chunks(chunk_size): - if not chunk: - break + async for chunk in process.iterate_chunks(): yield chunk if needs_analyze and len(audio_data) < 100000000: audio_data += chunk @@ -407,6 +404,8 @@ class StreamManager: ) # send analyze job to background worker + # TODO: feed audio chunks to analyzer while streaming + # so we don't have to load this large chunk in memory if needs_analyze and audio_data: self.mass.add_job(self.__analyze_audio, streamdetails, audio_data) @@ -466,7 +465,7 @@ async def async_crossfade_pcm_parts( args = ["sox", "-m", "-v", "1.0", "-t"] + pcm_args + [fadeoutfile.name, "-v", "1.0"] args += ["-t"] + pcm_args + [fadeinfile.name, "-t"] + pcm_args + ["-"] async with AsyncProcess(args, enable_write=False) as sox_proc: - crossfade_part = await sox_proc.communicate() + crossfade_part, _ = await sox_proc.communicate() fadeinfile.close() fadeoutfile.close() del fadeinfile @@ -485,5 +484,5 @@ async def async_strip_silence( if reverse: args.append("reverse") async with AsyncProcess(args, enable_write=True) as sox_proc: - stripped_data = await sox_proc.communicate(audio_data) + stripped_data, _ = await sox_proc.communicate(audio_data) return stripped_data -- 2.34.1