Events & Signals
TriggerFlow runs on a when‑emit signal loop. emit() sends signals, when() subscribes, and to() binds handlers. When a handler finishes, it emits a completion signal for downstream steps.
emit + when
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
async def planner(data: TriggerFlowEventData):
await data.async_emit("Plan.Read", {"task": "read"})
await data.async_emit("Plan.Write", {"task": "write"})
return "plan done"
@flow.chunk
async def reader(data: TriggerFlowEventData):
return f"read: {data.value['task']}"
@flow.chunk
async def writer(data: TriggerFlowEventData):
return f"write: {data.value['task']}"
flow.to(planner).end()
flow.when("Plan.Read").to(reader).collect("plan", "read")
flow.when("Plan.Write").to(writer).collect("plan", "write").end()Trigger sources
when() can listen to:
python
flow.when("Plan.Read") # event
flow.when({"runtime_data": "user"}) # runtime_data
flow.when({"flow_data": "env"}) # flow_dataNaming
- Use
Domain.Action(e.g.Plan.Read,Tool.Run) - Avoid overloading a single event name