模型集成
TriggerFlow 负责把多个业务阶段组织成可观测、可等待、可并发的流程。模型调用仍然交给 Agently Agent / request 层。chunk handler 是普通 async 函数,所以在 chunk 里调模型时,重点是三件事:
- 用 async,不阻塞 execution。
- 用结构化输出,让下一段 chunk 拿到稳定字段。
- 只有用户能受益时才把模型增量桥接到 runtime stream。
最小模式:chunk 里发一次结构化请求
from agently import Agently, TriggerFlow, TriggerFlowRuntimeData
agent = Agently.create_agent()
flow = TriggerFlow(name="classify-ticket")
async def classify(data: TriggerFlowRuntimeData):
result = (
agent
.input(data.input["content"])
.output({
"category": (str, "工单分类", True),
"priority": (str, "优先级", True),
})
.get_result()
)
classification = await result.async_get_data(
ensure_keys=["category", "priority"],
)
await data.async_set_state("classification", classification)
return classification
flow.to(classify)这段代码里,Agent 负责模型请求和结构化输出;TriggerFlow 负责把 classification 传给下一段,并把它写进 execution state,最终进入 close snapshot。
在 chunk 里保持 async
TriggerFlow execution 本身是 async 调度。chunk 里调用同步 start() 能跑,但会阻塞 event loop,影响并发、stream 和取消。服务代码里优先使用:
result.async_get_data(...)result.async_get_text()result.async_get_meta()result.get_async_generator(type="instant")
短脚本可以用同步 API,服务和长流程尽量 async-first。
一次请求,多种读取
同一个模型请求经常要读结构化结果、原文和 meta。用 get_result() 拿 result 对象,再从这个对象读取,不会重复发请求。
async def analyze(data):
result = (
agent
.input(data.input)
.output({
"summary": (str, "摘要", True),
"risk": (str, "风险等级", True),
})
.get_result()
)
text = await result.async_get_text()
obj = await result.async_get_data(ensure_keys=["summary", "risk"])
meta = await result.async_get_meta()
await data.async_set_state("raw_text", text)
await data.async_set_state("analysis", obj)
await data.async_set_state("model_meta", meta)
return obj业务字段进 state。provider、token、延迟等运行信息可以进单独的 meta key,或者只写服务日志。
把模型增量桥接到 runtime stream
前端需要边生成边显示时,把 Agent result 的 instant 增量转成 TriggerFlow runtime stream item。
async def draft_reply(data: TriggerFlowRuntimeData):
result = (
agent
.input(data.input["content"])
.output({
"title": (str, "回复标题", True),
"body": (str, "回复正文", True),
})
.get_result()
)
async for item in result.get_async_generator(type="instant"):
if item.delta:
await data.async_put_into_stream({
"type": "model_delta",
"path": item.path,
"delta": item.delta,
"done": item.is_complete,
})
final = await result.async_get_data(ensure_keys=["title", "body"])
await data.async_set_state("draft", final)
return finaltype="instant" 产出的是结构化字段的增量,不是 provider 原始 token。UI 可以按 path 更新标题、正文或其他字段。stream 结束后,async_get_data(...) 从同一个 result 读取最终结构化数据。
按 execution 注入不同 agent
模块级 agent 适合所有 execution 共用同一套模型配置。需要按租户、环境或请求切换模型时,把 agent 放进 runtime_resources。
execution = flow.create_execution(
runtime_resources={
"agent": Agently.create_agent().set_settings("OpenAICompatible", {
"model": tenant_model,
}),
},
)
async def classify(data):
agent = data.require_resource("agent")
result = agent.input(data.input).output({...}).get_result()
return await result.async_get_data()agent 持有模型配置和网络 client,不应该放进 state。state 留给可序列化的业务结果。
多个模型角色放在不同 chunk
一个 flow 可以有多个 agent:小模型分类,大模型起草,另一个模型做校验。
classifier = Agently.create_agent().set_settings("OpenAICompatible", {
"model": "${ENV.CLASSIFIER_MODEL}",
})
writer = Agently.create_agent().set_settings("OpenAICompatible", {
"model": "${ENV.WRITER_MODEL}",
})
async def classify(data):
result = classifier.input(data.input).output({...}).get_result()
return await result.async_get_data()
async def write_reply(data):
result = writer.input(data.input).output({...}).get_result()
return await result.async_get_data()
flow.to(classify).to(write_reply)TriggerFlow 只负责阶段连接和生命周期。每个 agent 仍然保持自己的 prompt、输出结构、工具和模型配置。
校验和重试仍然属于 request 层
chunk 里可以继续使用 request 层的输出控制、校验和重试:
async def extract(data):
result = (
agent
.input(data.input)
.output({"answer": (str, "回答", True)})
.validate(custom_business_check)
.get_result()
)
answer = await result.async_get_data(ensure_keys=["answer"])
await data.async_set_state("answer", answer)
return answer重试预算属于这一次模型请求,不会让 TriggerFlow 其他 chunk 自动重跑。需要流程级重试、反思或人工审批时,把它们显式写成 TriggerFlow 的图层结构。
不要把模型状态塞进 flow_data
flow_data 是 flow 级共享。把“上一次模型答案”“当前会话上下文”放进去,会在并发 execution 之间互相影响。
- 单次 execution 的模型输出:放
state。 - 多轮对话上下文:用 Session / Workspace / Recall 这类专门能力。
- agent、client、tool:放
runtime_resources。
另见
- Async First - 服务代码为什么保持 async
- 模型结果 -
get_result()和 result 缓存 - 输出控制 - 结构化输出、校验和重试
- 事件与流 - runtime stream 怎么被外部消费
- State 与 Resources - agent 和业务结果分别放哪