Only use TimedAsyncGenerator in one location of the chain and with a high timeout, just to detect it, not to recover.
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, communicate
-from .util import TimedAsyncGenerator, create_tempfile, detect_charset
+from .util import create_tempfile, detect_charset
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
pcm_format.content_type.value,
ffmpeg_proc.proc.pid,
)
- async for chunk in TimedAsyncGenerator(
- ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), timeout=30
- ):
+ async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
# for non-tracks we just yield all chunks directly
if streamdetails.media_type != MediaType.TRACK:
yield chunk
log_tail = ""
logger.debug(
"stream %s (with code %s) for %s - seconds streamed: %s %s",
- "finished" if finished else "aborted",
+ "cancelled" if cancelled else "finished" if finished else "aborted",
ffmpeg_proc.returncode,
streamdetails.uri,
seconds_streamed,
try:
start = time.time()
self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...")
- async for chunk in TimedAsyncGenerator(self.audio_input, timeout=30):
+ # use TimedAsyncGenerator to catch we're stuck waiting on data forever
+ # don't set this timeout too low because in some cases it can indeed take a while
+ # for data to arrive (e.g. when there is X amount of seconds in the buffer)
+ # so this timeout is just to catch if the source is stuck and rpeort it and not
+ # to recover from it.
+ async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300):
await self.write(chunk)
self.logger.log(
VERBOSE_LOG_LEVEL, "Audio data source exhausted in %.2fs", time.time() - start
from music_assistant.helpers.json import json_loads
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
-from music_assistant.helpers.util import TimedAsyncGenerator, lock, parse_title_and_version
+from music_assistant.helpers.util import lock, parse_title_and_version
from music_assistant.models.music_provider import MusicProvider
from .helpers import get_librespot_binary
if stderr:
log_reader = asyncio.create_task(_read_stderr())
- async for chunk in TimedAsyncGenerator(librespot_proc.iter_any(chunk_size), 20):
+ async for chunk in librespot_proc.iter_any(chunk_size):
yield chunk
bytes_received += len(chunk)
if stderr: