some tweaks to stream to handle edge cases
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 02:55:05 +0000 (03:55 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 02:55:05 +0000 (03:55 +0100)
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/ffmpeg.py
music_assistant/helpers/process.py

index fc3d81db3254783278c7fb141587b2cf9e566a58..be42446f0c295378bd7b07d1ad2ff87f55a19758 100644 (file)
@@ -449,7 +449,8 @@ class StreamsController(CoreController):
         ):
             # crossfade is not supported on this player due to missing gapless playback
             self.logger.warning(
-                "Crossfade disabled: Player %s does not support gapless playback",
+                "Crossfade disabled: Player %s does not support gapless playback, "
+                "consider enabling flow mode to enable crossfade on this player.",
                 queue_player.display_name if queue_player else "Unknown Player",
             )
             smart_fades_mode = SmartFadesMode.DISABLED
@@ -493,7 +494,7 @@ class StreamsController(CoreController):
         if queue_item.media_type == MediaType.RADIO:
             # keep very short buffer for radio streams
             # to keep them (more or less) realtime and prevent time outs
-            read_rate_input_args = ["-readrate", "1.00", "-readrate_initial_burst", "1"]
+            read_rate_input_args = ["-readrate", "1.01", "-readrate_initial_burst", "3"]
         elif "Network_Module" in user_agent or "transferMode.dlna.org" in request.headers:
             # and ofcourse we have an exception of the exception. Where most players actually NEED
             # the readrate filter to avoid disconnecting, some other players (DLNA/MusicCast)
index 0d42d357b8f3a98de9329478128bfa6d37064e16..0b37b25304ab792ac7daa7e1a8df1fba6d93e26d 100644 (file)
@@ -668,15 +668,15 @@ async def get_media_stream(
         elif ffmpeg_proc.returncode not in (0, None):
             raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}")
         finished = True
-    except (Exception, GeneratorExit) as err:
+    except (Exception, GeneratorExit, asyncio.CancelledError) as err:
         if isinstance(err, asyncio.CancelledError | GeneratorExit):
             # we were cancelled, just raise
             cancelled = True
-            raise
         logger.error("Error while streaming %s: %s", streamdetails.uri, err)
         # dump the last 10 lines of the log in case of an unclean exit
         logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
         streamdetails.stream_error = True
+        raise
     finally:
         # always ensure close is called which also handles all cleanup
         await ffmpeg_proc.close()
index 38577a8d06d43fe8c6d21e8b7d7ccb7d81bc9a40..2ae882ccddc4933e1572ef6774e0494226ad6ac1 100644 (file)
@@ -103,9 +103,14 @@ class FFMpeg(AsyncProcess):
         timeout: float | None = None,
     ) -> tuple[bytes, bytes]:
         """Override communicate to avoid blocking."""
-        if self._stdin_task and not self._stdin_task.done():
-            self._stdin_task.cancel()
-            with suppress(asyncio.CancelledError):
+        if self._stdin_task:
+            if not self._stdin_task.done():
+                self._stdin_task.cancel()
+            # Always await the task to consume any exception and prevent
+            # "Task exception was never retrieved" errors.
+            # Suppress CancelledError (from cancel) and any other exception
+            # since exceptions have already been propagated through the generator chain.
+            with suppress(asyncio.CancelledError, Exception):
                 await self._stdin_task
         if self._logger_task and not self._logger_task.done():
             self._logger_task.cancel()
@@ -115,9 +120,14 @@ class FFMpeg(AsyncProcess):
         """Close/terminate the process and wait for exit."""
         if self.closed:
             return
-        if self._stdin_task and not self._stdin_task.done():
-            self._stdin_task.cancel()
-            with suppress(asyncio.CancelledError):
+        if self._stdin_task:
+            if not self._stdin_task.done():
+                self._stdin_task.cancel()
+            # Always await the task to consume any exception and prevent
+            # "Task exception was never retrieved" errors.
+            # Suppress CancelledError (from cancel) and any other exception
+            # since exceptions have already been propagated through the generator chain.
+            with suppress(asyncio.CancelledError, Exception):
                 await self._stdin_task
         await super().close(send_signal)
         if self._logger_task and not self._logger_task.done():
index ff65ca7323b96a43e121c332fd8e2ac0d391daf1..499f7fc482c5ae512c27c1037877cdd633337b3a 100644 (file)
@@ -234,15 +234,17 @@ class AsyncProcess:
         if self.proc.stdin and not self.proc.stdin.is_closing():
             self.proc.stdin.close()
         # abort existing readers on stderr/stdout first before we send communicate
-        waiter: asyncio.Future[None]
-        if self.proc.stdout and (waiter := self.proc.stdout._waiter):  # type: ignore[attr-defined]
-            self.proc.stdout._waiter = None  # type: ignore[attr-defined]
-            if waiter and not waiter.done():
-                waiter.set_exception(asyncio.CancelledError())
-        if self.proc.stderr and (waiter := self.proc.stderr._waiter):  # type: ignore[attr-defined]
-            self.proc.stderr._waiter = None  # type: ignore[attr-defined]
-            if waiter and not waiter.done():
-                waiter.set_exception(asyncio.CancelledError())
+        # waiter: asyncio.Future[None]
+        # stdout_waiter = self.proc.stdout._waiter  # type: ignore[attr-defined]
+        # if self.proc.stdout and stdout_waiter:
+        #     self.proc.stdout._waiter = None  # type: ignore[attr-defined]
+        #     if stdout_waiter and not stdout_waiter.done():
+        #         stdout_waiter.set_exception(asyncio.CancelledError())
+        # stderr_waiter = self.proc.stderr._waiter  # type: ignore[attr-defined]
+        # if self.proc.stderr and stderr_waiter:
+        #     self.proc.stderr._waiter = None  # type: ignore[attr-defined]
+        #     if stderr_waiter and not stderr_waiter.done():
+        #         stderr_waiter.set_exception(asyncio.CancelledError())
         await asyncio.sleep(0)  # yield to loop
 
         # make sure the process is really cleaned up.