运行时流式输出
适用版本:4.0.8.1+
runtime_stream 是 TriggerFlow 的旁路输出通道:
- 不改变主结果
result - 可实时输出进度、token、阶段状态
适合:SSE、WebSocket、CLI 实时反馈、任务监控面板。
1. 基础模型
2. 基础使用:推送 + 停流
python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
async def stream_steps(data: TriggerFlowEventData):
for i in range(3):
await data.async_put_into_stream({"step": i + 1, "status": "working"})
await asyncio.sleep(0.05)
await data.async_stop_stream()
return "done"
flow.to(stream_steps).end()
for event in flow.get_runtime_stream("start", timeout=None):
print("[stream]", event)3. API 说明
生产中常用接口:
data.put_into_stream(...)/await data.async_put_into_stream(...)data.stop_stream()/await data.async_stop_stream()flow.get_runtime_stream(...)flow.get_async_runtime_stream(...)
4. 流式与最终结果并行
典型模式:
- 先连续推送
delta事件 - 收敛后推送
final事件 - 最后
stop_stream
这与 FastAPIHelper 的 SSE/WS 模式天然匹配。
5. 同步生成器桥接(新增能力)
v4.0.8.1 新增 FunctionShifter.asyncify_sync_generator(),用于把同步 generator 安全桥接为 async generator。
应用意义:
- 旧同步流代码可接入异步 runtime stream/HTTP 流
- 降低迁移异步改造成本
6. 常见问题
6.1 消费端一直阻塞
通常是忘记 stop_stream()。请保证所有分支都能停流(含异常分支)。
6.2 流里有事件但没有最终结果
runtime_stream 与 result 独立。请在 chunk 内明确 return 业务结果。
6.3 大量并发下流序错乱
若你有并发分支,建议为事件添加 source/chunk_id/seq 字段便于前端重排。