Skip to content

when 事件分支

when() 可以等待 Chunk 完成信号runtime_data 变化自定义事件,适合事件驱动的分支场景。

场景:等待某个 Chunk 完成后再继续

适合“阶段联动”:上游任务完成后,下游任务才启动。

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
async def parse(data: TriggerFlowEventData):
  return f"parsed:{data.value}"

@flow.chunk
async def build(data: TriggerFlowEventData):
  return f"built:{data.value}"

flow.to(parse)
flow.when(parse).to(build).end()

print(flow.start("doc"))

输出:

text
built:parsed:doc

场景:等待 runtime_data 变化

适合“状态到齐再触发”的场景,例如用户信息写入后再开始后续流程。

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def print_user(data: TriggerFlowEventData):
  print(f"user:{data.value['id']}")

flow.when({"runtime_data": "user"}).to(print_user)

execution = flow.create_execution()
execution.set_runtime_data("user", {"id": "u-1"})

输出:

text
user:u-1

场景:等待自定义事件(emit)

适合外部信号驱动(消息系统、Webhook、用户操作)。

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def print_join(data: TriggerFlowEventData):
  print(f"join:{data.value['id']}")

flow.when("User.Join").to(print_join)

execution = flow.create_execution()
execution.emit("User.Join", {"id": "u-9"})

输出:

text
join:u-9

when 的 mode:and / or / simple_or

when() 可同时监听多个信号,mode 决定触发逻辑:

  • and:全部到齐才触发,输出聚合字典
  • or:任意一个触发,输出 (type, event, value)
  • simple_or:任意一个触发,只输出 value

mode="and"

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def print_value(data: TriggerFlowEventData):
  print(data.value)

flow.when({"event": ["A", "B"]}, mode="and").to(print_value)

execution = flow.create_execution()
execution.emit("A", 1)
execution.emit("B", 2)

输出:

text
{'event': {'A': 1, 'B': 2}}

mode="or"

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def print_value(data: TriggerFlowEventData):
  print(data.value)

flow.when({"event": ["A", "B"]}, mode="or").to(print_value)

execution = flow.create_execution()
execution.emit("A", 1)
execution.emit("B", 2)

输出:

text
('event', 'A', 1)
('event', 'B', 2)

mode="simple_or"

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def print_value(data: TriggerFlowEventData):
  print(data.value)

flow.when({"event": ["A", "B"]}, mode="simple_or").to(print_value)

execution = flow.create_execution()
execution.emit("A", 1)
execution.emit("B", 2)

输出:

text
1
2