From 66ca91577e48c3164bc492254ed46874d79adc66 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 7 Jan 2025 09:41:47 +0100 Subject: [PATCH] Fix: Use watchdog instead of TimedAsyncGenerator for audio stream --- music_assistant/helpers/audio.py | 6 ++---- music_assistant/helpers/ffmpeg.py | 28 ++++++++++++++++++++-------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index fe9c0ebc..5811fe07 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -45,7 +45,7 @@ from .dsp import filter_to_ffmpeg_params from .ffmpeg import FFMpeg, get_ffmpeg_stream from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from .process import AsyncProcess, check_output, communicate -from .util import TimedAsyncGenerator, create_tempfile, detect_charset +from .util import create_tempfile, detect_charset if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig, PlayerConfig @@ -307,9 +307,7 @@ async def get_media_stream( ) try: await ffmpeg_proc.start() - async for chunk in TimedAsyncGenerator( - ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 300 - ): + async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size): # for radio streams we just yield all chunks directly if streamdetails.media_type == MediaType.RADIO: yield chunk diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 0405fc27..3dbab2d5 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -15,7 +15,7 @@ from music_assistant_models.helpers import get_global_cache_value from music_assistant.constants import VERBOSE_LOG_LEVEL from .process import AsyncProcess -from .util import TimedAsyncGenerator, close_async_generator +from .util import close_async_generator if TYPE_CHECKING: from music_assistant_models.media_items import AudioFormat @@ -115,7 +115,9 @@ class FFMpeg(AsyncProcess): content_type_raw = line.split(": Audio: ")[1].split(" ")[0] content_type = ContentType.try_parse(content_type_raw) self.logger.debug( - "Detected (input) content type: %s (%s)", content_type, content_type_raw + "Detected (input) content type: %s (%s)", + content_type, + content_type_raw, ) self.input_format.content_type = content_type del line @@ -125,15 +127,25 @@ class FFMpeg(AsyncProcess): if TYPE_CHECKING: self.audio_input: AsyncGenerator[bytes, None] generator_exhausted = False - audio_received = False + audio_received = asyncio.Event() + + async def stdin_watchdog() -> None: + # this is a simple watchdog to ensure we don't get stuck forever waiting for audio data + try: + await asyncio.wait_for(audio_received.wait(), timeout=300) + except TimeoutError: + self.logger.error("No audio data received from source after timeout") + self._stdin_task.cancel() + + asyncio.create_task(stdin_watchdog()) + try: - async for chunk in TimedAsyncGenerator(self.audio_input, 300): - audio_received = True - if self.proc and self.proc.returncode is not None: - raise AudioError("Parent process already exited") + async for chunk in self.audio_input: + if not audio_received.is_set(): + audio_received.set() await self.write(chunk) generator_exhausted = True - if not audio_received: + if not audio_received.is_set(): raise AudioError("No audio data received from source") except Exception as err: if isinstance(err, asyncio.CancelledError): -- 2.34.1