for_each 列表并发处理
for_each 适合“同一种处理逻辑并发作用到列表每一项”。
推荐示例
python
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData
flow = TriggerFlow()
@flow.chunk("input_list")
async def input_list(_: TriggerFlowRuntimeData):
return [1, 2, 3]
@flow.chunk("square")
async def square(data: TriggerFlowRuntimeData):
await asyncio.sleep(0.01)
return data.value * data.value
(
flow.to(input_list)
.for_each(concurrency=2)
.to(square)
.end_for_each()
.end()
)适用场景
- 批量校验
- 批量特征提取
- 批量检索或转换
当前最佳实践
for_each内的单项处理应尽量无共享副作用- 需要访问外部工具时,用
runtime_resources注入,不要闭包包依赖 - 如果单项任务很重,显式设置
concurrency
不再推荐
- 用
for_each实现需要逐项等待外部审批的长生命周期流程 - 在 item handler 里写依赖强顺序的共享可变逻辑