fix buffer issues
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 12 Apr 2024 08:40:23 +0000 (10:40 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 12 Apr 2024 08:40:23 +0000 (10:40 +0200)
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/process.py

index 66c7835bb0f96d2dc8e84bee14d00f3c761c58a2..30f4814463271862b64b0c196fa848d26d9aeefe 100644 (file)
@@ -19,12 +19,7 @@ from typing import TYPE_CHECKING
 from aiofiles.os import wrap
 from aiohttp import web
 
-from music_assistant.common.helpers.util import (
-    get_ip,
-    select_free_port,
-    try_parse_bool,
-    try_parse_duration,
-)
+from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
 from music_assistant.common.models.config_entries import (
     ConfigEntry,
     ConfigValueOption,
@@ -49,7 +44,6 @@ from music_assistant.server.helpers.audio import (
     FFMpeg,
     check_audio_support,
     crossfade_pcm_parts,
-    get_chunksize,
     get_ffmpeg_stream,
     get_hls_stream,
     get_icy_stream,
@@ -111,8 +105,7 @@ class StreamsController(CoreController):
         self.manifest.name = "Streamserver"
         self.manifest.description = (
             "Music Assistant's core controller that is responsible for "
-            "streaming audio to players on the local network as well as "
-            "some player specific local control callbacks."
+            "streaming audio to players on the local network."
         )
         self.manifest.icon = "cast-audio"
         self.announcements: dict[str, str] = {}
@@ -288,10 +281,25 @@ class StreamsController(CoreController):
             queue.display_name,
         )
         queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id)
-        async for chunk in self.get_media_stream(
-            streamdetails=queue_item.streamdetails,
+        pcm_format = AudioFormat(
+            content_type=ContentType.from_bit_depth(output_format.bit_depth),
+            sample_rate=queue_item.streamdetails.audio_format.sample_rate,
+            bit_depth=queue_item.streamdetails.audio_format.bit_depth,
+            channels=2,
+        )
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self.get_media_stream(
+                streamdetails=queue_item.streamdetails,
+                pcm_format=pcm_format,
+            ),
+            input_format=pcm_format,
             output_format=output_format,
-            extra_filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+            filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+            extra_input_args=[
+                # use readrate to limit buffering ahead too much
+                "-readrate",
+                "1.2",
+            ],
         ):
             try:
                 await resp.write(chunk)
@@ -364,6 +372,11 @@ class StreamsController(CoreController):
             output_format=output_format,
             filter_params=get_player_filter_params(self.mass, queue_player.player_id),
             chunk_size=icy_meta_interval if enable_icy else None,
+            extra_input_args=[
+                # use readrate to limit buffering ahead too much
+                "-readrate",
+                "1.2",
+            ],
         ):
             try:
                 await resp.write(chunk)
@@ -545,7 +558,7 @@ class StreamsController(CoreController):
             # handle incoming audio chunks
             async for chunk in self.get_media_stream(
                 queue_track.streamdetails,
-                output_format=pcm_format,
+                pcm_format=pcm_format,
             ):
                 # buffer size needs to be big enough to include the crossfade part
                 # allow it to be a bit smaller when playback just starts
@@ -688,16 +701,15 @@ class StreamsController(CoreController):
     async def get_media_stream(
         self,
         streamdetails: StreamDetails,
-        output_format: AudioFormat,
-        extra_filter_params: list[str] | None = None,
+        pcm_format: AudioFormat,
     ) -> AsyncGenerator[tuple[bool, bytes], None]:
-        """Get the audio stream for the given streamdetails."""
+        """Get the audio stream for the given streamdetails as raw pcm chunks."""
         logger = self.logger.getChild("media_stream")
         is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
         if is_radio:
             streamdetails.seek_position = 0
         # collect all arguments for ffmpeg
-        filter_params = extra_filter_params or []
+        filter_params = []
         if streamdetails.target_loudness is not None:
             # add loudnorm filters
             filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11"
@@ -734,20 +746,20 @@ class StreamsController(CoreController):
         async with FFMpeg(
             audio_input=audio_source,
             input_format=streamdetails.audio_format,
-            output_format=output_format,
+            output_format=pcm_format,
             filter_params=filter_params,
             extra_input_args=[
                 *extra_input_args,
                 # we criple ffmpeg a bit on purpose with the filter_threads
                 # option so it doesn't consume all cpu when calculating loudnorm
                 "-filter_threads",
-                "1",
+                "2",
             ],
             collect_log_history=True,
             logger=logger,
         ) as ffmpeg_proc:
             try:
-                async for chunk in ffmpeg_proc.iter_any(get_chunksize(output_format)):
+                async for chunk in ffmpeg_proc.iter_any(pcm_format.pcm_sample_size):
                     bytes_sent += len(chunk)
                     yield chunk
                     del chunk
@@ -759,15 +771,7 @@ class StreamsController(CoreController):
                     await ffmpeg_proc.close()
 
                 # try to determine how many seconds we've streamed
-                seconds_streamed = 0
-                if output_format.content_type.is_pcm():
-                    seconds_streamed = (
-                        bytes_sent / output_format.pcm_sample_size if bytes_sent else 0
-                    )
-                elif line := next((x for x in ffmpeg_proc.log_history if "time=" in x), None):
-                    duration_str = line.split("time=")[1].split(" ")[0]
-                    seconds_streamed = try_parse_duration(duration_str)
-
+                seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
                 logger.debug(
                     "stream %s (with code %s) for %s - seconds streamed: %s",
                     "finished" if finished else "aborted",
@@ -775,7 +779,6 @@ class StreamsController(CoreController):
                     streamdetails.uri,
                     seconds_streamed,
                 )
-
                 if seconds_streamed:
                     streamdetails.seconds_streamed = seconds_streamed
                 # store accurate duration
@@ -797,9 +800,7 @@ class StreamsController(CoreController):
                                 streamdetails.item_id, streamdetails.provider, loudness_details
                             )
                         )
-
                 # report playback
-                # TODO: Move this to the queue controller ?
                 if finished or seconds_streamed > 30:
                     self.mass.create_task(
                         self.mass.music.mark_item_played(
index 6ea68932bf08793257898f1ce81e95adc4a4745e..8dfa838ff0ca884bf9b9e017c94043c0f9bb73a5 100644 (file)
@@ -959,6 +959,7 @@ def get_ffmpeg_args(
         input_args += ["-f", input_format.content_type.value, "-i", input_path]
 
     # collect output args
+    output_args = []
     if output_path.upper() == "NULL":
         # devnull stream
         output_args = ["-f", "null", "-"]
@@ -966,8 +967,10 @@ def get_ffmpeg_args(
         # use wav so we at least have some headers for the rest of the chain
         output_args = ["-f", "wav", output_path]
     else:
+        if output_format.content_type.is_pcm():
+            output_args += ["-acodec", output_format.content_type.name.lower()]
         # use explicit format identifier for all other
-        output_args = [
+        output_args += [
             "-f",
             output_format.content_type.value,
             "-ar",
@@ -976,8 +979,6 @@ def get_ffmpeg_args(
             str(output_format.channels),
             output_path,
         ]
-        if output_format.content_type.is_pcm():
-            output_args += ["-acodec", output_format.content_type.name.lower()]
 
     # determine if we need to do resampling
     if (
index 3d1be5c3303cb80d81cbde59c9172457796e47c4..466fe4b65418db9d5bfee7667e13b929a7f4f3b5 100644 (file)
@@ -109,20 +109,22 @@ class AsyncProcess:
         """Yield chunks of n size from the process stdout."""
         while True:
             chunk = await self.readexactly(n)
-            yield chunk
             if len(chunk) == 0:
                 break
+            yield chunk
 
     async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks as they come in from process stdout."""
         while True:
             chunk = await self.read(n)
-            yield chunk
             if len(chunk) == 0:
                 break
+            yield chunk
 
     async def readexactly(self, n: int) -> bytes:
         """Read exactly n bytes from the process stdout (or less if eof)."""
+        if self._close_called:
+            return b""
         try:
             return await self.proc.stdout.readexactly(n)
         except asyncio.IncompleteReadError as err:
@@ -135,6 +137,8 @@ class AsyncProcess:
         and may return less or equal bytes than requested, but at least one byte.
         If EOF was received before any byte is read, this function returns empty byte object.
         """
+        if self._close_called:
+            return b""
         return await self.proc.stdout.read(n)
 
     async def write(self, data: bytes) -> None:
@@ -165,7 +169,7 @@ class AsyncProcess:
 
     async def read_stderr(self) -> bytes:
         """Read line from stderr."""
-        if self.closed:
+        if self._close_called:
             return b""
         try:
             return await self.proc.stderr.readline()
@@ -199,12 +203,15 @@ class AsyncProcess:
         if self.proc.stdin and not self.proc.stdin.is_closing():
             self.proc.stdin.close()
         # abort existing readers on stderr/stdout first before we send communicate
-        if self.proc.stdout and self.proc.stdout._waiter is not None:
-            with suppress(asyncio.exceptions.InvalidStateError):
-                self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
-        if self.proc.stderr and self.proc.stderr._waiter is not None:
-            with suppress(asyncio.exceptions.InvalidStateError):
-                self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
+        waiter: asyncio.Future
+        if self.proc.stdout and (waiter := self.proc.stdout._waiter):
+            self.proc.stdout._waiter = None
+            if waiter and not waiter.done():
+                waiter.set_exception(asyncio.CancelledError())
+        if self.proc.stderr and (waiter := self.proc.stderr._waiter):
+            self.proc.stderr._waiter = None
+            if waiter and not waiter.done():
+                waiter.set_exception(asyncio.CancelledError())
         await asyncio.sleep(0)  # yield to loop
 
         # make sure the process is really cleaned up.