Skip to content

Events & Signals

TriggerFlow runs on a when‑emit signal loop. emit() sends signals, when() subscribes, and to() binds handlers. When a handler finishes, it emits a completion signal for downstream steps.

emit + when

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
async def planner(data: TriggerFlowEventData):
  await data.async_emit("Plan.Read", {"task": "read"})
  await data.async_emit("Plan.Write", {"task": "write"})
  return "plan done"

@flow.chunk
async def reader(data: TriggerFlowEventData):
  return f"read: {data.value['task']}"

@flow.chunk
async def writer(data: TriggerFlowEventData):
  return f"write: {data.value['task']}"

flow.to(planner).end()
flow.when("Plan.Read").to(reader).collect("plan", "read")
flow.when("Plan.Write").to(writer).collect("plan", "write").end()

Trigger sources

when() can listen to:

python
flow.when("Plan.Read")               # event
flow.when({"runtime_data": "user"})  # runtime_data
flow.when({"flow_data": "env"})      # flow_data

Naming

  • Use Domain.Action (e.g. Plan.Read, Tool.Run)
  • Avoid overloading a single event name