Skip to content

并发上限控制

并发太高会导致外部服务限流、数据库连接耗尽、CPU 抖动。我们需要一个“可控上限”,确保系统稳定。

场景与解决方式

场景:外部服务有 QPS 上限,或数据库连接池较小。
命题:并发必须被限制,否则会触发限流或雪崩。
解决:在执行实例上设置 concurrency,统一限制并发深度。

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk("a")
async def work(data: TriggerFlowEventData):
  print(f"start-{data.value}")
  await asyncio.sleep(0.01)
  print(f"end-{data.value}")
  return data.value

@flow.chunk("b")
async def work_b(data: TriggerFlowEventData):
  return await work(data)

@flow.chunk("c")
async def work_c(data: TriggerFlowEventData):
  return await work(data)

flow.batch(work, work_b, work_c).end()

execution = flow.create_execution(concurrency=1)
result = execution.start("X")
print(result)

输出:

text
start-X
end-X
start-X
end-X
start-X
end-X
{'a': 'X', 'b': 'X', 'c': 'X'}