Fix: Use watchdog instead of TimedAsyncGenerator for audio stream
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 7 Jan 2025 08:41:47 +0000 (09:41 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 7 Jan 2025 08:41:47 +0000 (09:41 +0100)
music_assistant/helpers/audio.py
music_assistant/helpers/ffmpeg.py

index fe9c0ebc1a2c384275742a82eaf0bfcde70ba32a..5811fe073a8818185e7d6f1f6e4f89e5604a0ad1 100644 (file)
@@ -45,7 +45,7 @@ from .dsp import filter_to_ffmpeg_params
 from .ffmpeg import FFMpeg, get_ffmpeg_stream
 from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
 from .process import AsyncProcess, check_output, communicate
-from .util import TimedAsyncGenerator, create_tempfile, detect_charset
+from .util import create_tempfile, detect_charset
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import CoreConfig, PlayerConfig
@@ -307,9 +307,7 @@ async def get_media_stream(
     )
     try:
         await ffmpeg_proc.start()
-        async for chunk in TimedAsyncGenerator(
-            ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 300
-        ):
+        async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
             # for radio streams we just yield all chunks directly
             if streamdetails.media_type == MediaType.RADIO:
                 yield chunk
index 0405fc275011f42205bd54b1de655b8e9849b3f3..3dbab2d51cd0e74305192456a174d283188ef217 100644 (file)
@@ -15,7 +15,7 @@ from music_assistant_models.helpers import get_global_cache_value
 from music_assistant.constants import VERBOSE_LOG_LEVEL
 
 from .process import AsyncProcess
-from .util import TimedAsyncGenerator, close_async_generator
+from .util import close_async_generator
 
 if TYPE_CHECKING:
     from music_assistant_models.media_items import AudioFormat
@@ -115,7 +115,9 @@ class FFMpeg(AsyncProcess):
                     content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
                     content_type = ContentType.try_parse(content_type_raw)
                     self.logger.debug(
-                        "Detected (input) content type: %s (%s)", content_type, content_type_raw
+                        "Detected (input) content type: %s (%s)",
+                        content_type,
+                        content_type_raw,
                     )
                     self.input_format.content_type = content_type
             del line
@@ -125,15 +127,25 @@ class FFMpeg(AsyncProcess):
         if TYPE_CHECKING:
             self.audio_input: AsyncGenerator[bytes, None]
         generator_exhausted = False
-        audio_received = False
+        audio_received = asyncio.Event()
+
+        async def stdin_watchdog() -> None:
+            # this is a simple watchdog to ensure we don't get stuck forever waiting for audio data
+            try:
+                await asyncio.wait_for(audio_received.wait(), timeout=300)
+            except TimeoutError:
+                self.logger.error("No audio data received from source after timeout")
+                self._stdin_task.cancel()
+
+        asyncio.create_task(stdin_watchdog())
+
         try:
-            async for chunk in TimedAsyncGenerator(self.audio_input, 300):
-                audio_received = True
-                if self.proc and self.proc.returncode is not None:
-                    raise AudioError("Parent process already exited")
+            async for chunk in self.audio_input:
+                if not audio_received.is_set():
+                    audio_received.set()
                 await self.write(chunk)
             generator_exhausted = True
-            if not audio_received:
+            if not audio_received.is_set():
                 raise AudioError("No audio data received from source")
         except Exception as err:
             if isinstance(err, asyncio.CancelledError):