Skip to content

基本流程与信号链

TriggerFlow 的“链式写法”本质是信号链:to() 把处理任务挂到当前信号上,任务完成后 emit 完成信号,下一段 to() 继续等待该信号。

线性链路

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
async def normalize(data: TriggerFlowEventData):
  return str(data.value).strip()

@flow.chunk
async def greet(data: TriggerFlowEventData):
  return f"Hello, {data.value}"

flow.to(normalize).to(greet).end()
result = flow.start(" Agently ")
print(result)

直接在 to() 里传函数是便利写法,适合快速验证;更规范的方式见「Chunk 概念」部分。

side_branch:不阻塞的旁路

旁路用于“记录、监控、打印”之类的非主流程工作。它不改变当前信号链:

python
flow = TriggerFlow()

@flow.chunk
async def main_task(data: TriggerFlowEventData):
  return f"main: {data.value}"

@flow.chunk
async def side_task(data: TriggerFlowEventData):
  print(f"[side] {data.value}")
  return "side done"

@flow.chunk
def print_main(data: TriggerFlowEventData):
  print("[main]", data.value)

(
  flow.to(main_task)
  .____(print_info=True, show_value=True)
  .side_branch(side_task)
  .to(print_main)
  .end()
)