TriggerFlow 编排 Playbook
场景
多步流程需要条件分支、路由、并发、异步与状态信号。
1. 编排能力地图
如何阅读这张图
- 这张图说明 TriggerFlow 的能力不是一个个孤立 API,而是一套可以组合的编排元件。
- 真正的工程价值来自把主链、并发、信号、收敛和旁路输出组合成可观察的流程。
2. 需要使用功能(关键特性)
to:主流程串联if_condition/match:条件与路由for_each:列表批处理batch+concurrency:并发控制runtime_data:状态信号
3. 具体操作
- 先用
to组织主链路。 - 用
if_condition/match做路由判断。 - 用
for_each或batch处理并发。
4. 完整代码(并发 + runtime_stream)
python
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData
flow = TriggerFlow()
@flow.chunk("normalize")
async def normalize(data: TriggerFlowRuntimeData):
topic = str(data.value).strip()
data.set_runtime_data("topic", topic)
data.put_into_stream({"stage": "normalized", "topic": topic})
return topic
@flow.chunk("fetch_facts")
async def fetch_facts(data: TriggerFlowRuntimeData):
await asyncio.sleep(0.05)
data.put_into_stream({"stage": "facts_ready", "topic": data.value})
return f"facts({data.value})"
@flow.chunk("fetch_risks")
async def fetch_risks(data: TriggerFlowRuntimeData):
await asyncio.sleep(0.03)
data.put_into_stream({"stage": "risks_ready", "topic": data.value})
return f"risks({data.value})"
@flow.chunk("compile_report")
async def compile_report(data: TriggerFlowRuntimeData):
topic = data.get_runtime_data("topic")
report = {
"topic": topic,
"facts": data.value.get("fetch_facts"),
"risks": data.value.get("fetch_risks"),
}
data.put_into_stream({"stage": "compiled", "report": report})
data.stop_stream()
return report
flow.to(normalize)
flow.when({"runtime_data": "topic"}).batch(fetch_facts, fetch_risks, concurrency=2).to(compile_report).end()
execution = flow.create_execution(concurrency=2)
for item in execution.get_runtime_stream("Agently TriggerFlow", timeout=5):
print("STREAM:", item)
result = execution.get_result(timeout=5)
print("RESULT:", result)5. 真实输出
text
STREAM: {'stage': 'facts_ready', 'topic': 'Agently TriggerFlow'}
STREAM: {'stage': 'risks_ready', 'topic': 'Agently TriggerFlow'}
STREAM: {'stage': 'compiled', 'report': {'topic': 'Agently TriggerFlow', 'facts': 'facts(Agently TriggerFlow)', 'risks': 'risks(Agently TriggerFlow)'}}
RESULT: {'topic': 'Agently TriggerFlow', 'facts': 'facts(Agently TriggerFlow)', 'risks': 'risks(Agently TriggerFlow)'}6. 验证点
- 并发任务全部完成。
- runtime_stream 有阶段性事件。
- 聚合结果符合预期。