Skip to content

运行时流与旁路输出

runtime_stream 不是业务状态,也不是最终结果。它的职责是: 让长流程在执行中途就能被外部观察和消费。

适合什么时候读

  • 你想把 TriggerFlow 的中间态推给 UI、SSE、日志或外部消费者
  • 你已经在用异步 chunk,准备把模型流式结果继续往外发
  • 你不确定 runtime_streamstate / result 的边界

你会学到什么

  • runtime_stream 为什么是旁路观察通道
  • 为什么工程上更推荐 async_put_into_stream(...)get_async_runtime_stream(...)
  • 它和 instant 结构化流式、async_emit(...) 的关系

运行时流链路

这张图强调的是: runtime_stream 和业务状态并列存在, 它承担的是“观察链路”,不是“可恢复状态”。

推荐原则

  • 新的工程集成优先使用异步接口
  • 中间态展示走 runtime_stream
  • 可恢复业务状态走 data.state
  • 最终交付结果走 result

同步接口更适合 demo、脚本和本地调试;只要是服务端或长期运行流程,就建议默认异步。

最小示例

python
from agently import TriggerFlow, TriggerFlowRuntimeData

flow = TriggerFlow()


@flow.chunk("stream_steps")
async def stream_steps(data: TriggerFlowRuntimeData):
    await data.async_put_into_stream({"step": 1})
    await data.async_put_into_stream({"step": 2})
    await data.async_stop_stream()
    return "done"


flow.to(stream_steps)

消费侧推荐:

python
execution = flow.create_execution()

async for event in execution.get_async_runtime_stream("start", timeout=None):
    print(event)

instant 组合时怎么用

这就是最推荐的实时链路:

  • 模型用 instant 暴露结构化节点
  • chunk 把高价值节点写进 runtime_stream
  • 同时对真正需要继续路由的节点做 async_emit(...)

sub_flow 的 stream 桥接

默认情况下,child execution 的 runtime stream 会桥接到 parent execution。

这意味着:

  • UI 通常只需要监听父 execution
  • 子流程里的旁路事件不会天然丢失

但要注意: 这只解决观察链路,不解决父子状态同步。

timeout 和停止

  • timeout 控制的是消费侧等待新事件的时长
  • async_stop_stream() 结束的是 stream,不是 execution 本身

如果外部消费者认为流程已经“卡住”,先确认的是:

  • 是否真的没有新 stream 事件
  • 是否遗漏了显式 stop
  • 是否把业务状态误当成了 stream

常见误区

  • runtime_stream 当成最终数据同步机制。
  • 在异步服务里仍然优先用同步 get_runtime_stream(...)
  • 把所有 token 级噪声都直接推给 stream,而不是先用 instant 或业务事件做收敛。

下一步去哪

  • agently-triggerflow-interrupts-and-stream
  • agently-triggerflow-model-integration