import asyncio
import hashlib
import html
+import inspect
import os
import urllib.parse
from collections.abc import Awaitable, Callable
if hasattr(result, "__anext__"):
# handle async generator (for really large listings)
result = [item async for item in result]
- elif asyncio.iscoroutine(result):
+ elif inspect.iscoroutine(result):
result = await result
return web.json_response(result, dumps=json_dumps)
except Exception as e:
from __future__ import annotations
import asyncio
+import inspect
import logging
from concurrent import futures
from contextlib import suppress
)
items = []
result = items
- elif asyncio.iscoroutine(result):
+ elif inspect.iscoroutine(result):
result = await result
await self._send_message(SuccessResultMessage(msg.message_id, result))
except Exception as err:
from __future__ import annotations
-import asyncio
import inspect
import logging
import logging.handlers
check_func = check_func.func
wrapper_func: Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]
- if asyncio.iscoroutinefunction(check_func):
+ if inspect.iscoroutinefunction(check_func):
async_func = cast("Callable[..., Coroutine[Any, Any, None]]", func)
@wraps(async_func)
from __future__ import annotations
import asyncio
+import inspect
import logging
import os
import pathlib
continue
if not (id_filter is None or object_id in id_filter):
continue
- if asyncio.iscoroutinefunction(cb_func):
+ if inspect.iscoroutinefunction(cb_func):
if TYPE_CHECKING:
cb_func = cast("Callable[[MassEvent], Coroutine[Any, Any, None]]", cb_func)
self.create_task(cb_func, event_obj)
return existing
self.verify_event_loop_thread("create_task")
- if asyncio.iscoroutinefunction(target):
+ if inspect.iscoroutinefunction(target):
# coroutine function
task = self.loop.create_task(target(*args, **kwargs))
- elif asyncio.iscoroutine(target):
+ elif inspect.iscoroutine(target):
# coroutine
task = self.loop.create_task(target)
elif callable(target):
self._tracked_timers.pop(task_id)
self.create_task(_target, *args, task_id=task_id, abort_existing=True, **kwargs)
- if asyncio.iscoroutinefunction(target) or asyncio.iscoroutine(target):
+ if inspect.iscoroutinefunction(target) or inspect.iscoroutine(target):
# coroutine function
if TYPE_CHECKING:
target = cast("Coroutine[Any, Any, _R]", target)
from __future__ import annotations
import asyncio
+import inspect
import json
import logging
from contextlib import suppress
# Execute the handler
result = handler.target(**args)
- if asyncio.iscoroutine(result):
+ if inspect.iscoroutine(result):
result = await result
return result