Skip to content

for_each 列表并发处理

for_each 让列表元素并发进入同一个处理节点,并把结果按原顺序汇合。

场景、命题与解决

场景:对列表逐项执行相同处理,例如批量校验、批量特征提取。
命题:逐项串行会拉长整体耗时。
解决:用 for_each 对列表并发执行,并汇总结果。

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def input_list(_: TriggerFlowEventData):
  return [1, 2, 3]

@flow.chunk
async def square(data: TriggerFlowEventData):
  await asyncio.sleep(0.01)
  return data.value * data.value

(
  flow.to(input_list)
  .for_each(concurrency=2)
  .to(square)
  .end_for_each()
  .end()
)
result = flow.start()
print(result)

输出:

text
[1, 4, 9]