Skip to content

Concurrency Limits

Too much parallelism can hit rate limits, exhaust DB connections, or spike CPU. You need a bounded concurrency policy.

Scenario and solution

Scenario: external services have strict QPS limits or small connection pools.
Problem: unbounded parallelism causes throttling and instability.
Solution: set concurrency on the execution to cap parallel depth.

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk("a")
async def work(data: TriggerFlowEventData):
  print(f"start-{data.value}")
  await asyncio.sleep(0.01)
  print(f"end-{data.value}")
  return data.value

@flow.chunk("b")
async def work_b(data: TriggerFlowEventData):
  return await work(data)

@flow.chunk("c")
async def work_c(data: TriggerFlowEventData):
  return await work(data)

flow.batch(work, work_b, work_c).end()

execution = flow.create_execution(concurrency=1)
result = execution.start("X")
print(result)

Output:

text
start-X
end-X
start-X
end-X
start-X
end-X
{'a': 'X', 'b': 'X', 'c': 'X'}