# we need to restart playback
self.mass.create_task(self.resume(queue_id))
else:
- self.mass.create_task(self._enqueue_next(queue, queue.current_index))
+ self.mass.call_later(5, self._enqueue_next(queue, queue.current_index))
@api_command("player_queues/play_media")
async def play_media(
from .process import AsyncProcess, check_output, communicate
from .tags import parse_tags
from .throttle_retry import BYPASS_THROTTLER
-from .util import create_tempfile
+from .util import TimedAsyncGenerator, create_tempfile
if TYPE_CHECKING:
from music_assistant.common.models.player_queue import QueueItem
)
try:
await ffmpeg_proc.start()
- async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
+ async for chunk in TimedAsyncGenerator(
+ ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 60
+ ):
# for radio streams we just yield all chunks directly
if streamdetails.media_type == MediaType.RADIO:
yield chunk
from music_assistant.constants import VERBOSE_LOG_LEVEL
from .process import AsyncProcess
-from .util import close_async_generator
+from .util import TimedAsyncGenerator, close_async_generator
LOGGER = logging.getLogger("ffmpeg")
generator_exhausted = False
audio_received = False
try:
- async for chunk in self.audio_input:
+ async for chunk in TimedAsyncGenerator(self.audio_input, 30):
audio_received = True
await self.write(chunk)
generator_exhausted = True
) as ffmpeg_proc:
# read final chunks from stdout
iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
- async for chunk in iterator:
+ async for chunk in TimedAsyncGenerator(iterator, 60):
yield chunk
return await func(*args, **kwargs)
return wrapper
+
+
+class TimedAsyncGenerator:
+ """
+ Async iterable that times out after a given time.
+
+ Source: https://medium.com/@dmitry8912/implementing-timeouts-in-pythons-asynchronous-generators-f7cbaa6dc1e9
+ """
+
+ def __init__(self, iterable, timeout=0):
+ """
+ Initialize the AsyncTimedIterable.
+
+ Args:
+ iterable: The async iterable to wrap.
+ timeout: The timeout in seconds for each iteration.
+ """
+
+ class AsyncTimedIterator:
+ def __init__(self):
+ self._iterator = iterable.__aiter__()
+
+ async def __anext__(self):
+ result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
+ if not result:
+ raise StopAsyncIteration
+ return result
+
+ self._factory = AsyncTimedIterator
+
+ def __aiter__(self):
+ """Return the async iterator."""
+ return self._factory()
"pipe",
"--single-track",
spotify_uri,
- "--token",
+ "--access-token",
auth_info["access_token"],
]
if seek_position: