from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, check_output, communicate
-from .util import TimedAsyncGenerator, create_tempfile, detect_charset
+from .util import create_tempfile, detect_charset
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
)
try:
await ffmpeg_proc.start()
- async for chunk in TimedAsyncGenerator(
- ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 300
- ):
+ async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
# for radio streams we just yield all chunks directly
if streamdetails.media_type == MediaType.RADIO:
yield chunk
from music_assistant.constants import VERBOSE_LOG_LEVEL
from .process import AsyncProcess
-from .util import TimedAsyncGenerator, close_async_generator
+from .util import close_async_generator
if TYPE_CHECKING:
from music_assistant_models.media_items import AudioFormat
content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
content_type = ContentType.try_parse(content_type_raw)
self.logger.debug(
- "Detected (input) content type: %s (%s)", content_type, content_type_raw
+ "Detected (input) content type: %s (%s)",
+ content_type,
+ content_type_raw,
)
self.input_format.content_type = content_type
del line
if TYPE_CHECKING:
self.audio_input: AsyncGenerator[bytes, None]
generator_exhausted = False
- audio_received = False
+ audio_received = asyncio.Event()
+
+ async def stdin_watchdog() -> None:
+ # this is a simple watchdog to ensure we don't get stuck forever waiting for audio data
+ try:
+ await asyncio.wait_for(audio_received.wait(), timeout=300)
+ except TimeoutError:
+ self.logger.error("No audio data received from source after timeout")
+ self._stdin_task.cancel()
+
+ asyncio.create_task(stdin_watchdog())
+
try:
- async for chunk in TimedAsyncGenerator(self.audio_input, 300):
- audio_received = True
- if self.proc and self.proc.returncode is not None:
- raise AudioError("Parent process already exited")
+ async for chunk in self.audio_input:
+ if not audio_received.is_set():
+ audio_received.set()
await self.write(chunk)
generator_exhausted = True
- if not audio_received:
+ if not audio_received.is_set():
raise AudioError("No audio data received from source")
except Exception as err:
if isinstance(err, asyncio.CancelledError):