for_each 列表并发处理
for_each 让列表元素并发进入同一个处理节点,并把结果按原顺序汇合。
场景、命题与解决
场景:对列表逐项执行相同处理,例如批量校验、批量特征提取。
命题:逐项串行会拉长整体耗时。
解决:用 for_each 对列表并发执行,并汇总结果。
python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def input_list(_: TriggerFlowEventData):
return [1, 2, 3]
@flow.chunk
async def square(data: TriggerFlowEventData):
await asyncio.sleep(0.01)
return data.value * data.value
(
flow.to(input_list)
.for_each(concurrency=2)
.to(square)
.end_for_each()
.end()
)
result = flow.start()
print(result)输出:
text
[1, 4, 9]