异步执行支持
Agently 内核是 async-first。模型请求、结果读取、TriggerFlow 编排与运行态流式输出均提供异步版本,适合 Web 服务和高并发场景。
建议在您自己的异步运行环境中(如 FastAPI 等异步框架)使用这些异步方法,以获得更高并发性能。本文档中的同步示例,与下方清单中一一对应的同步方法,都可以直接替换为异步版本。
异步方法清单
以下为框架内最终暴露的异步方法与异步生成器入口,按对象归类:
Agent / Request
Agently.create_agent()与Agently.create_request()都具备相同的异步接口。
agent.async_start()agent.async_get_text()agent.async_get_data()agent.async_get_data_object()agent.async_get_meta()agent.get_async_generator(...)
ModelResponse / ModelResponseResult
response.async_get_text()response.async_get_data()response.async_get_data_object()response.async_get_meta()response.get_async_generator(...)
TriggerFlow(Flow)
flow.async_start(...)flow.async_start_execution(...)flow.async_set_flow_data(...)flow.async_append_flow_data(...)flow.async_del_flow_data(...)flow.get_async_runtime_stream(...)
TriggerFlow(Execution)
execution.async_start(...)execution.async_get_result(...)execution.async_emit(...)execution.async_set_runtime_data(...)execution.async_append_runtime_data(...)execution.async_del_runtime_data(...)execution.async_put_into_stream(...)execution.async_stop_stream(...)execution.get_async_runtime_stream(...)
TriggerFlowEventData(Chunk 内)
data.async_emit(...)data.async_set_runtime_data(...)data.async_append_runtime_data(...)data.async_del_runtime_data(...)data.async_set_flow_data(...)data.async_append_flow_data(...)data.async_del_flow_data(...)data.async_put_into_stream(...)data.async_stop_stream(...)
工具与 MCP
agent.async_use_mcp(...)
KeyWaiter
agent.async_get_key_result(...)agent.async_wait_keys(...)agent.async_start_waiter(...)
Session / Memo
session.async_judge_resize(...)session.async_resize(...)
最小用法示例
python
import asyncio
from agently import Agently
agent = Agently.create_agent()
async def main():
response = (
agent
.input("用一句话解释可观测性")
.output({"intro": ("str", "一句话")})
.get_response()
)
data = await response.async_get_data()
print(data)
asyncio.run(main())