Skip to content

事件驱动与信号编排

TriggerFlow 的底层逻辑就是 when‑emit 信号机制emit() 发信号,when() 监听信号,to() 把处理任务绑定到“当前信号”。处理完成后,Chunk 会 emit 自己的完成信号,触发下游链路。

emit + when

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
async def planner(data: TriggerFlowEventData):
  await data.async_emit("Plan.Read", {"task": "read"})
  await data.async_emit("Plan.Write", {"task": "write"})
  return "plan done"

@flow.chunk
async def reader(data: TriggerFlowEventData):
  return f"read: {data.value['task']}"

@flow.chunk
async def writer(data: TriggerFlowEventData):
  return f"write: {data.value['task']}"

@flow.chunk
def print_collect(data: TriggerFlowEventData):
  print("[collect]", data.value)

flow.to(planner).end()
flow.when("Plan.Read").to(reader).collect("plan", "read")
(
  flow.when("Plan.Write")
  .to(writer)
  .collect("plan", "write")
  .to(print_collect)
  .end()
)

flow.start("go", wait_for_result=False)

when 监听的三类触发

when() 可监听三类信号:

python
flow.when("Plan.Read")               # event
flow.when({"runtime_data": "user"})  # runtime_data
flow.when({"flow_data": "env"})      # flow_data

to 的完成信号

to() 对应的 Chunk 执行结束后,会 emit 自己的完成信号(Chunk[xxx]-<id>)。这使得链式编排本质上是“信号 → 任务 → 信号”的循环。

事件命名建议

  • 使用 Domain.Action 形式,例如 Plan.ReadTool.Run
  • 避免单一事件名过多复用,便于排查

循环事件

事件可以用来构建循环:

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def start_loop(_: TriggerFlowEventData):
  return "start"

@flow.chunk
async def loop_step(data: TriggerFlowEventData):
  await data.async_emit("LoopEnd", "ok")
  return "loop"

@flow.chunk
def finish(data: TriggerFlowEventData):
  return f"done: {data.value}"

flow.to(start_loop)
flow.when("Loop").to(loop_step)
flow.when("LoopEnd").to(finish).end()