Skip to content

TriggerFlow 编排 Playbook

当一个 AI 过程有多个阶段时,写成一段长函数很快会失控:分类结果决定分支,多个 item 要并行处理,中途还可能等人工或外部回调。TriggerFlow 的作用,是把这些阶段变成可观测、可暂停、可恢复的 execution。

先判断是否需要 TriggerFlow

过程特点建议
只是一次模型请求加结构化输出留在 request 层
只有一个 Action 调用留在 Agent / Action 层
有 3 个以上业务阶段考虑 TriggerFlow
中间结果决定分支TriggerFlow
要 fan-out 并行处理一组 itemTriggerFlow
要 pause 等人工或 WebhookTriggerFlow
要把进度流给 UITriggerFlow runtime stream
要跨进程重启恢复TriggerFlow save/load

不是所有流程都要上编排。TriggerFlow 适合让阶段边界、等待点和输出流变清楚。

推荐结构

text
build_flow()
  |
  |-- prepare      校验和归一化输入
  |-- classify     模型结构化输出
  |-- branch       if / match / case
  |-- fan-out      batch 或 for_each
  |-- pause        人工 / Webhook 等待
  |-- finalize     写最终 state

run_flow()
  |
  |-- create_execution(auto_close=False, runtime_resources={...})
  |-- async_start(...)
  |-- consume runtime stream
  |-- async_close() -> close snapshot

服务场景里,flow definition 和一次 execution 分开。definition 可以作为模块级对象或 builder;execution 是某次请求。

基础骨架

python
from agently import TriggerFlow, TriggerFlowRuntimeData


def build_flow():
    flow = TriggerFlow(name="review-flow")

    async def prepare(data: TriggerFlowRuntimeData):
        payload = {"text": data.input["text"], "source": data.input.get("source")}
        await data.async_set_state("request", payload)
        return payload

    async def classify(data: TriggerFlowRuntimeData):
        agent = data.require_resource("agent")
        result = (
            agent
            .input(data.input["text"])
            .output({"category": (str, "A/B/C", True)})
            .get_result()
        )
        classification = await result.async_get_data(ensure_keys=["category"])
        await data.async_set_state("classification", classification)
        return classification

    async def handle_default(data: TriggerFlowRuntimeData):
        await data.async_set_state("answer", {"category": data.input["category"]})

    (
        flow.to(prepare)
        .to(classify)
        .if_condition(lambda data: data.input["category"] == "A")
            .to(handle_default)
        .elif_condition(lambda data: data.input["category"] == "B")
            .to(handle_default)
        .else_condition()
            .to(handle_default)
        .end_condition()
    )
    return flow


async def run(input_value, agent):
    flow = build_flow()
    execution = flow.create_execution(
        auto_close=False,
        runtime_resources={"agent": agent},
    )
    await execution.async_start(input_value)
    return await execution.async_close()

几个边界:

  • agent 是 live 对象,放 runtime_resources
  • 中间业务结果写 state,进入 close snapshot。
  • 服务端显式 close,便于接 stream、pause、外部事件。

fan-out:处理未知数量 item

python
async def list_subtasks(data):
    return data.input["subtasks"]


async def handle_one(data):
    agent = data.require_resource("agent")
    result = agent.input(data.input).output({
        "summary": (str, "摘要", True),
    }).get_result()
    return await result.async_get_data(ensure_keys=["summary"])


(
    flow.to(list_subtasks)
    .for_each(concurrency=4)
        .to(handle_one)
    .end_for_each()
    .to(collect)
)

concurrency 按模型限速和下游 API 承载设置。结果会按输入顺序汇总成 list。

pause:等人工或外部系统

python
async def ask_approval(data):
    return await data.async_pause_for(
        type="approval",
        payload={"summary": data.input["summary"]},
        resume_to="next",
    )

含 pause 的 execution 要保留 handle,通常也要保存 execution state:

python
execution = flow.create_execution(auto_close=False, runtime_resources={...})
await execution.async_start(input_value)
saved = execution.save()

恢复时重新创建 execution、注入资源、load saved,再 continue_with(...)

runtime stream:给 UI 进度

chunk 内推 item:

python
await data.async_put_into_stream({
    "stage": "classify",
    "status": "done",
    "category": classification["category"],
})

服务侧消费:

python
async for item in execution.get_async_runtime_stream(timeout=None):
    send_to_client(item)

最终业务结果仍然从 async_close() 返回的 close snapshot 读取。

save/load:跨重启恢复

python
saved = execution.save()
db.put(execution.id, saved)

restored = flow.create_execution(
    auto_close=False,
    runtime_resources={"agent": agent, "db": db_client},
)
restored.load(db.get(execution.id))

save 保存 state、pending interrupt、metadata 和 resource_keys;不保存 live resources,也不保存正在运行到一半的协程。

常见误用

写法问题
为了整理代码而拆子流子流应该有复用契约,不只是缩短文件
在 chunk 里手写额外模型重试request 层已有 validate / retry;流程级重试要显式建图
把 DB client、agent、collection 放进 statesnapshot 不该包含 live 对象
只用 flow.async_start(...) 却又想 pause / 外部 emit隐式 execution 没有可恢复 handle

另见