Skip to content

for_each 列表并发处理

for_each 适合“同一种处理逻辑并发作用到列表每一项”。

推荐示例

python
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData

flow = TriggerFlow()

@flow.chunk("input_list")
async def input_list(_: TriggerFlowRuntimeData):
    return [1, 2, 3]

@flow.chunk("square")
async def square(data: TriggerFlowRuntimeData):
    await asyncio.sleep(0.01)
    return data.value * data.value

(
    flow.to(input_list)
    .for_each(concurrency=2)
    .to(square)
    .end_for_each()
    .end()
)

适用场景

  • 批量校验
  • 批量特征提取
  • 批量检索或转换

当前最佳实践

  • for_each 内的单项处理应尽量无共享副作用
  • 需要访问外部工具时,用 runtime_resources 注入,不要闭包包依赖
  • 如果单项任务很重,显式设置 concurrency

不再推荐

  • for_each 实现需要逐项等待外部审批的长生命周期流程
  • 在 item handler 里写依赖强顺序的共享可变逻辑