Skip to content

事件、信号与外部注入

TriggerFlow 的执行单位不是“节点跳转”,而是“信号到达 -> handler 触发”。

适合什么时候读

  • 你已经理解了 TriggerFlow 的基本线性流程
  • 你开始使用 emit()when() 或外部 execution.emit(...)
  • 你想理解内部事件和外部事件为什么能汇合到同一个运行模型里

你会学到什么

  • TriggerFlow 的信号来源有哪些
  • emit() / when() 怎么配合
  • signal_infodeclare_emits() 为什么重要

TIP

事件驱动一旦进入真实工程,推荐默认使用异步信号链路:chunk 内优先 async_emit(...),外部消费者优先配合 runtime_stream 和异步执行入口。这样更容易把模型的 instant 结构化流式结果接成稳定事件,而不是把同步桥接层塞进事件循环。

内部与外部事件如何汇合

这张图放在正文里,是为了说明:外部 execution.emit(...) 和内部 data.async_emit(...) 最终都会进入同一个 signal router。

常见信号来源

  • chunk 完成后的 continuation signal
  • chunk 内显式 emit() / async_emit()
  • 外部 execution.emit(...)
  • set_runtime_data() 触发的 runtime_data 信号
  • set_flow_data() 触发的 flow_data 信号

最小示例

python
from agently import TriggerFlow, TriggerFlowRuntimeData

flow = TriggerFlow()

@flow.chunk("planner")
async def planner(data: TriggerFlowRuntimeData):
    await data.async_emit("Plan.Read", {"task": "read"})
    await data.async_emit("Plan.Write", {"task": "write"})
    return "planned"

flow.to(planner).end()
flow.when("Plan.Read").to(lambda data: f"read:{data.value['task']}")
flow.when("Plan.Write").to(lambda data: f"write:{data.value['task']}")

signal_info 有什么用

在 handler 里可以读取:

  • data.signal
  • data.signal_id
  • data.signal_source
  • data.signal_meta
  • data.signal_info

这类信息通常用于排障、审计和 resume 场景。

declare_emits() 什么时候用

如果 handler 内部会发出业务事件,而这些事件无法从静态链路自动推断,就应该显式声明:

python
chunk = flow.chunk("notify")(notify)
chunk.declare_emits("Alert", "ApprovalRequest")

常见误区

  • 还按 DAG 思维理解 TriggerFlow,忽略了信号模型。
  • 把内部实现触发器直接暴露成外部系统协议。
  • 没有声明关键 emits,导致可视化和配置导出都不清楚。

下一步去哪

  • agently-triggerflow