事件驱动与信号编排
TriggerFlow 的底层逻辑就是 when‑emit 信号机制。emit() 发信号,when() 监听信号,to() 把处理任务绑定到“当前信号”。处理完成后,Chunk 会 emit 自己的完成信号,触发下游链路。
emit + when
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
async def planner(data: TriggerFlowEventData):
await data.async_emit("Plan.Read", {"task": "read"})
await data.async_emit("Plan.Write", {"task": "write"})
return "plan done"
@flow.chunk
async def reader(data: TriggerFlowEventData):
return f"read: {data.value['task']}"
@flow.chunk
async def writer(data: TriggerFlowEventData):
return f"write: {data.value['task']}"
@flow.chunk
def print_collect(data: TriggerFlowEventData):
print("[collect]", data.value)
flow.to(planner).end()
flow.when("Plan.Read").to(reader).collect("plan", "read")
(
flow.when("Plan.Write")
.to(writer)
.collect("plan", "write")
.to(print_collect)
.end()
)
flow.start("go", wait_for_result=False)when 监听的三类触发
when() 可监听三类信号:
python
flow.when("Plan.Read") # event
flow.when({"runtime_data": "user"}) # runtime_data
flow.when({"flow_data": "env"}) # flow_datato 的完成信号
to() 对应的 Chunk 执行结束后,会 emit 自己的完成信号(Chunk[xxx]-<id>)。这使得链式编排本质上是“信号 → 任务 → 信号”的循环。
事件命名建议
- 使用
Domain.Action形式,例如Plan.Read、Tool.Run - 避免单一事件名过多复用,便于排查
循环事件
事件可以用来构建循环:
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def start_loop(_: TriggerFlowEventData):
return "start"
@flow.chunk
async def loop_step(data: TriggerFlowEventData):
await data.async_emit("LoopEnd", "ok")
return "loop"
@flow.chunk
def finish(data: TriggerFlowEventData):
return f"done: {data.value}"
flow.to(start_loop)
flow.when("Loop").to(loop_step)
flow.when("LoopEnd").to(finish).end()