Skip to content

collect 收敛机制

可视化边界:收敛图用于解释运行逻辑;如果你要导出 JSON/YAML,依然需要具名 handler / condition。

collect() 用来把多条分支的结果收敛到同一个 collection。

1. fan-out / fan-in 结构

如何阅读这张图

  • collect() 不是单独的节点类型,而是分支结果汇合到同一个 collection 的机制。
  • 收敛完成后真正触发下游的是 Collect-<name> 这个内部事件。

2. 基本接口

python
process.collect(
    collection_name: str,
    branch_id: str | None = None,
    mode: "filled_and_update" | "filled_then_empty" = "filled_and_update",
)

收敛后会发出内部事件:

  • Collect-<collection_name>

3. 典型示例

python
flow.when("Plan.Read").to(reader).collect("plan", "read")
flow.when("Plan.Write").to(writer).collect("plan", "write")
flow.when({"collect": "plan"}).to(handle_plan).end()

收敛值形态:

python
{
    "read": "...",
    "write": "...",
}

4. 两种模式

text
filled_and_update:
  - 收敛状态持续保留
  - 同 branch 后续写入会覆盖更新

filled_then_empty:
  - 一轮收敛完成后立即清空内部状态
  - 下一轮重新收集

架构思路

  • filled_and_update 适合长期共享面板或持续更新的聚合视图
  • filled_then_empty 更适合回合式流程、循环处理或一轮一轮独立汇总

5. 使用建议

  • branch_id 应稳定可读,不要用业务无意义的随机值
  • 多轮循环里更推荐 filled_then_empty
  • 如果还要让其他链路继续监听收敛结果,可直接 when({"collect": name})