Skip to content

运行时流式输出

适用版本:4.0.8.1+

runtime_stream 是 TriggerFlow 的旁路输出通道:

  • 不改变主结果 result
  • 可实时输出进度、token、阶段状态

适合:SSE、WebSocket、CLI 实时反馈、任务监控面板。

1. 基础模型

2. 基础使用:推送 + 停流

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
async def stream_steps(data: TriggerFlowEventData):
    for i in range(3):
        await data.async_put_into_stream({"step": i + 1, "status": "working"})
        await asyncio.sleep(0.05)
    await data.async_stop_stream()
    return "done"

flow.to(stream_steps).end()

for event in flow.get_runtime_stream("start", timeout=None):
    print("[stream]", event)

3. API 说明

生产中常用接口:

  • data.put_into_stream(...) / await data.async_put_into_stream(...)
  • data.stop_stream() / await data.async_stop_stream()
  • flow.get_runtime_stream(...)
  • flow.get_async_runtime_stream(...)

4. 流式与最终结果并行

典型模式:

  • 先连续推送 delta 事件
  • 收敛后推送 final 事件
  • 最后 stop_stream

这与 FastAPIHelper 的 SSE/WS 模式天然匹配。

5. 同步生成器桥接(新增能力)

v4.0.8.1 新增 FunctionShifter.asyncify_sync_generator(),用于把同步 generator 安全桥接为 async generator。

应用意义:

  • 旧同步流代码可接入异步 runtime stream/HTTP 流
  • 降低迁移异步改造成本

6. 常见问题

6.1 消费端一直阻塞

通常是忘记 stop_stream()。请保证所有分支都能停流(含异常分支)。

6.2 流里有事件但没有最终结果

runtime_streamresult 独立。请在 chunk 内明确 return 业务结果。

6.3 大量并发下流序错乱

若你有并发分支,建议为事件添加 source/chunk_id/seq 字段便于前端重排。