From: Marcel van der Veldt Date: Wed, 23 Dec 2020 19:24:40 +0000 (+0100) Subject: fix queue streaming X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=4b12eb0455ef5a8415acf7bcb5db352090d13d84;p=music-assistant-server.git fix queue streaming --- diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 7584489b..7574aa43 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -1,6 +1,6 @@ """All constants for Music Assistant.""" -__version__ = "0.0.78" +__version__ = "0.0.79" REQUIRED_PYTHON_VER = "3.7" # configuration keys/attributes diff --git a/music_assistant/helpers/compare.py b/music_assistant/helpers/compare.py index b2f0dcfc..fe3ae5a8 100644 --- a/music_assistant/helpers/compare.py +++ b/music_assistant/helpers/compare.py @@ -99,7 +99,7 @@ def compare_track(left_track: Track, right_track: Track): right_albums = right_track.albums or [right_track.album] if not ( compare_albums(left_albums, right_albums) - or abs(left_track.duration - right_track.duration) <= 5 + or abs(left_track.duration - right_track.duration) <= 3 ): return False # 100% match, all criteria passed diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 49a3727b..f7d99ee5 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -27,10 +27,7 @@ class AsyncProcess: # workaround that is compatible with uvloop def __init__( - self, - process_args: List, - enable_write: bool = False, - enable_shell=False, + self, process_args: List, enable_write: bool = False, enable_shell=False ): """Initialize.""" self._proc = subprocess.Popen( @@ -55,8 +52,8 @@ class AsyncProcess: # prevent subprocess deadlocking, send terminate and read remaining bytes await self.loop.run_in_executor(None, self._proc.terminate) await self.loop.run_in_executor(None, self.__read) - LOGGER.debug("process finished") del self._proc + return exc_type not in (GeneratorExit, asyncio.CancelledError) async def iterate_chunks( self, chunksize: int = DEFAULT_CHUNKSIZE @@ -64,9 +61,10 @@ class AsyncProcess: """Yield chunks from the process stdout. Generator.""" while True: chunk = await self.read(chunksize) - if not chunk: - break yield chunk + if len(chunk) < chunksize: + # last chunk + break async def read(self, chunksize: int = DEFAULT_CHUNKSIZE) -> bytes: """Read x bytes from the process stdout.""" @@ -114,10 +112,7 @@ class AsyncProcess: """Write bytes to process and read back results.""" if self._cancelled: raise asyncio.CancelledError() - stdout, _ = await self.loop.run_in_executor( - None, self._proc.communicate, input_data - ) - return stdout + return await self.loop.run_in_executor(None, self._proc.communicate, input_data) class AsyncProcessBroken: diff --git a/music_assistant/managers/music.py b/music_assistant/managers/music.py index 99fccf4f..d14c8cec 100755 --- a/music_assistant/managers/music.py +++ b/music_assistant/managers/music.py @@ -597,7 +597,7 @@ class MusicManager: full_track = media_item else: full_track = await self.async_get_track( - media_item.item_id, media_item.provider, refresh=True + media_item.item_id, media_item.provider ) # sort by quality and check track availability for prov_media in sorted( diff --git a/music_assistant/managers/streams.py b/music_assistant/managers/streams.py index b72bd7d3..23990576 100755 --- a/music_assistant/managers/streams.py +++ b/music_assistant/managers/streams.py @@ -54,7 +54,7 @@ class StreamManager: output_format: SoxOutputFormat = SoxOutputFormat.FLAC, resample: Optional[int] = None, gain_db_adjust: Optional[float] = None, - chunk_size: int = 1000000, + chunk_size: int = 4000000, ) -> AsyncGenerator[Tuple[bool, bytes], None]: """Get the sox manipulated audio data for the given streamdetails.""" # collect all args for sox @@ -86,13 +86,10 @@ class StreamManager: """Forward audio chunks to sox stdin.""" # feed audio data into sox stdin for processing async for chunk in self.async_get_media_stream(streamdetails): - if not chunk: - break await sox_proc.write(chunk) await sox_proc.write_eof() fill_buffer_task = self.mass.loop.create_task(fill_buffer()) - await asyncio.sleep(1) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly prev_chunk = b"" @@ -137,8 +134,6 @@ class StreamManager: async for chunk in self.async_queue_stream_pcm( player_id, sample_rate, 32 ): - if not chunk: - break await sox_proc.write(chunk) fill_buffer_task = self.mass.loop.create_task(fill_buffer())