Skip to content

TriggerFlow 核心概念

TriggerFlow 把流程拆解为信号、节点与运行态。理解这些概念,才能正确组织复杂编排。

Flow 与 Execution

  • TriggerFlow(Flow):流程定义本身(蓝图 + 规则)。
  • Execution(执行实例):一次运行的上下文与状态,每次 start() 都会创建。

信号与触发类型

TriggerFlow 的触发信号分三类:

  • eventemit() 产生的事件信号
  • runtime_dataset_runtime_data() 变更 key 时产生信号
  • flow_dataset_flow_data() 变更 key 时产生信号(影响所有执行实例)

注意:flow_data 会影响当前 Flow 下所有正在执行未来将要执行的 Execution。除非场景非常确定,一般不推荐使用。尤其当你把 Flow 作为服务接口的基础对象,为不同用户请求创建 Execution 时,避免使用 flow_data,否则容易造成用户数据泄露或交叉污染。

这三类信号都可以被 when() 捕获。

to 与 Chunk(完成信号)

to() 的本质是“把处理任务挂到当前触发信号上”。任务执行完成后,会自动发出一个 完成信号

  • 每个 Chunk 都有独立 trigger(形如 Chunk[xxx]-<id>
  • 上游信号触发 Chunk 执行
  • Chunk 完成后 emit 自己的完成信号
  • 下游既可以继续 to(),也可以通过 when() 监听

Chunk:显式命名与规范写法

在工程实践里,建议先用 @flow.chunk 定义“具名节点”,再用 flow.to() / flow.when().to() 组织流程。这样既可复用,也更易读。

to() 直接传函数是便利写法,适合快速验证;规范写法是把函数提升为 Chunk,再按名称挂接。

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk("normalize_input")
async def normalize(data: TriggerFlowEventData):
  return str(data.value).strip()

@flow.chunk
async def greet(data: TriggerFlowEventData):
  return f"Hello, {data.value}"

flow.to("normalize_input").to(greet).end()

execution = flow.create_execution()
result = execution.start(" Agently ")
print(result)

when(事件订阅)

when() 是事件订阅器,支持:

  • 单事件:when("Plan.Read")
  • 运行态:when({"runtime_data": "user"})
  • 多事件聚合:when({"event": ["A", "B"]}, mode="and")

多事件聚合会生成一个新的内部事件(When-xxx),用于后续链路。

TriggerFlowEventData

节点入参为 TriggerFlowEventData,包含:

  • value:当前事件 payload
  • event / trigger_event
  • type / trigger_type
  • 运行态方法:set_runtime_data / get_runtime_data
  • 事件方法:emit / async_emit
  • 流式输出:put_into_stream / stop_stream

collect(分支聚合)

collect() 用于多分支结果汇合,等待多个分支填充后再触发 Collect-xxx 事件。

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def reader(data: TriggerFlowEventData):
  return f"read: {data.value}"

@flow.chunk
def writer(data: TriggerFlowEventData):
  return f"write: {data.value}"

@flow.chunk
def print_collect(data: TriggerFlowEventData):
  print(data.value)

flow.when("Plan.Read").to(reader).collect("plan", "read")
flow.when("Plan.Write").to(writer).collect("plan", "write").to(print_collect).end()

runtime_stream(运行时流)

运行过程中可用 put_into_stream() 输出状态片段,通过 get_runtime_stream() 实时消费。

蓝图(BluePrint)

Flow 结构可 save_blue_print() 导出,并用 load_blue_print() 复用。

从 token 输出到实时信号

这部分提供 TriggerFlow 信号逻辑的背景,详见:从 token 输出到实时信号