Skip to content

从 Token 输出到实时信号

这页讲的是当前最值得推荐的一条工程链路: response.get_async_generator(type="instant") 把结构化字段提前暴露出来,再由 TriggerFlow 把它们变成受控事件和 runtime stream。

适合什么时候读

  • 你已经在做结构化流式输出
  • 某些字段一旦完成就有业务价值,不想等整份结果结束
  • 你要把模型流式结果接进 TriggerFlow 的路由、并发或旁路输出

你会学到什么

  • 为什么 instant 比原始 token 更适合驱动 TriggerFlow
  • 如何用 async_emit(...) 把完成字段变成业务信号
  • 如何同时把中间态写进 runtime_stream

核心链路

这张图想说明的重点是: 不是所有 token 都值得变成业务事件。instant 先把它们收敛成结构化节点,再交给 TriggerFlow,噪声会少很多。

为什么推荐 instant 而不是直接用 delta

  • instant 给的是字段级、列表项级的结构化节点
  • 更容易基于 path / wildcard_path 做稳定匹配
  • 更适合在 is_complete=True 时触发下游工作

delta 仍然有用,但更适合:

  • UI 打字机效果
  • 轻量日志
  • 很便宜的状态更新

推荐模式

  1. 先用 output() 定义结构
  2. 在 async chunk 里固定一次 response = request.get_response()
  3. async for item in response.get_async_generator(type="instant") 消费结构化节点
  4. 只有在 item.is_complete 时才做事件派发
  5. 派发时区分两类出口
  • 需要下游继续处理的: await data.async_emit(...)
  • 只需要外部观察的: await data.async_put_into_stream(...)

最小示例

python
from agently import Agently, TriggerFlow, TriggerFlowRuntimeData

agent = Agently.create_agent()
flow = TriggerFlow()


@flow.chunk("plan")
async def plan(data: TriggerFlowRuntimeData):
    response = (
        agent
        .input(f"为主题 {data.value} 生成标题和待办列表")
        .output(
            {
                "title": (str, "标题"),
                "tasks": [
                    {
                        "task": (str, "任务内容"),
                        "owner": (str, "负责人"),
                    }
                ],
            }
        )
        .get_response()
    )

    async for item in response.get_async_generator(type="instant"):
        if not item.is_complete:
            continue

        if item.path == "title":
            await data.async_put_into_stream({"stage": "title_ready", "title": item.value})

        if item.wildcard_path == "tasks[*]":
            await data.async_emit("TaskReady", item.value)
            await data.async_put_into_stream({"stage": "task_ready", "task": item.value})

    return await response.async_get_data()


@flow.chunk("handle_task")
async def handle_task(data: TriggerFlowRuntimeData):
    return {"accepted": True, "task": data.value}


flow.to(plan)
flow.when("TaskReady").to(handle_task)

这个模式为什么高价值

  • 一次模型请求就能同时服务 UI、中间态日志和下游业务分支
  • 不需要等完整结果生成完再统一拆任务
  • TriggerFlow 把后续工作控制在显式的 when(...) 路由里,而不是在流式循环里无界扩散

关键边界

  • instant 不会自动创建新的模型请求
  • 不要对每个部分 delta 都直接派发新任务
  • item.is_complete 之前的局部片段通常只适合做显示,不适合做重逻辑分发

常见误区

  • 把原始 token 流直接当业务协议。
  • instant 循环里不做边界控制,直接无限起下游任务。
  • 只做 async_emit(...),却不把有价值的中间态写进 runtime_stream

下一步去哪

  • agently-triggerflow-model-integration
  • agently-triggerflow
  • agently-output-control