fix queue streaming
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 23 Dec 2020 19:24:40 +0000 (20:24 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 23 Dec 2020 19:24:40 +0000 (20:24 +0100)
music_assistant/constants.py
music_assistant/helpers/compare.py
music_assistant/helpers/process.py
music_assistant/managers/music.py
music_assistant/managers/streams.py

index 7584489bf9882262b818e7303e84b67307b1fcf5..7574aa438169a3480d140324478a0edac1e470c0 100755 (executable)
@@ -1,6 +1,6 @@
 """All constants for Music Assistant."""
 
-__version__ = "0.0.78"
+__version__ = "0.0.79"
 REQUIRED_PYTHON_VER = "3.7"
 
 # configuration keys/attributes
index b2f0dcfcd1d5df0e0f4c593c3a2a6d74209dec8d..fe3ae5a87f967fe553ce6e246df48426d699104c 100644 (file)
@@ -99,7 +99,7 @@ def compare_track(left_track: Track, right_track: Track):
     right_albums = right_track.albums or [right_track.album]
     if not (
         compare_albums(left_albums, right_albums)
-        or abs(left_track.duration - right_track.duration) <= 5
+        or abs(left_track.duration - right_track.duration) <= 3
     ):
         return False
     # 100% match, all criteria passed
index 49a3727bbce799d239db34ec6d6dd439c4fc3975..f7d99ee5d8c7b5b1ba3c11a0519ce5ec13aa3e44 100644 (file)
@@ -27,10 +27,7 @@ class AsyncProcess:
     # workaround that is compatible with uvloop
 
     def __init__(
-        self,
-        process_args: List,
-        enable_write: bool = False,
-        enable_shell=False,
+        self, process_args: List, enable_write: bool = False, enable_shell=False
     ):
         """Initialize."""
         self._proc = subprocess.Popen(
@@ -55,8 +52,8 @@ class AsyncProcess:
             # prevent subprocess deadlocking, send terminate and read remaining bytes
             await self.loop.run_in_executor(None, self._proc.terminate)
             await self.loop.run_in_executor(None, self.__read)
-        LOGGER.debug("process finished")
         del self._proc
+        return exc_type not in (GeneratorExit, asyncio.CancelledError)
 
     async def iterate_chunks(
         self, chunksize: int = DEFAULT_CHUNKSIZE
@@ -64,9 +61,10 @@ class AsyncProcess:
         """Yield chunks from the process stdout. Generator."""
         while True:
             chunk = await self.read(chunksize)
-            if not chunk:
-                break
             yield chunk
+            if len(chunk) < chunksize:
+                # last chunk
+                break
 
     async def read(self, chunksize: int = DEFAULT_CHUNKSIZE) -> bytes:
         """Read x bytes from the process stdout."""
@@ -114,10 +112,7 @@ class AsyncProcess:
         """Write bytes to process and read back results."""
         if self._cancelled:
             raise asyncio.CancelledError()
-        stdout, _ = await self.loop.run_in_executor(
-            None, self._proc.communicate, input_data
-        )
-        return stdout
+        return await self.loop.run_in_executor(None, self._proc.communicate, input_data)
 
 
 class AsyncProcessBroken:
index 99fccf4f119cb3d55b6ef985771cd65efeb8c378..d14c8cecd49fa6e5b03fce82b5b41a2dd4551913 100755 (executable)
@@ -597,7 +597,7 @@ class MusicManager:
                 full_track = media_item
             else:
                 full_track = await self.async_get_track(
-                    media_item.item_id, media_item.provider, refresh=True
+                    media_item.item_id, media_item.provider
                 )
             # sort by quality and check track availability
             for prov_media in sorted(
index b72bd7d38d07eb4fcb35970b6ece4ae3ca731595..23990576ebc42e9c8937945342333b44fc482ede 100755 (executable)
@@ -54,7 +54,7 @@ class StreamManager:
         output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
         resample: Optional[int] = None,
         gain_db_adjust: Optional[float] = None,
-        chunk_size: int = 1000000,
+        chunk_size: int = 4000000,
     ) -> AsyncGenerator[Tuple[bool, bytes], None]:
         """Get the sox manipulated audio data for the given streamdetails."""
         # collect all args for sox
@@ -86,13 +86,10 @@ class StreamManager:
                 """Forward audio chunks to sox stdin."""
                 # feed audio data into sox stdin for processing
                 async for chunk in self.async_get_media_stream(streamdetails):
-                    if not chunk:
-                        break
                     await sox_proc.write(chunk)
                 await sox_proc.write_eof()
 
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())
-            await asyncio.sleep(1)
             # yield chunks from stdout
             # we keep 1 chunk behind to detect end of stream properly
             prev_chunk = b""
@@ -137,8 +134,6 @@ class StreamManager:
                 async for chunk in self.async_queue_stream_pcm(
                     player_id, sample_rate, 32
                 ):
-                    if not chunk:
-                        break
                     await sox_proc.write(chunk)
 
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())