交互层与主动任务 Playbook
企业 Agent 不是只在接口返回最后一句话。用户、客服、运营和审批人更关心过程:任务是不是开始了,卡在哪一步,是否需要补充材料,系统何时会主动提醒。
Agently 可以把模型增量、流程进度、等待恢复和运行观测分到不同通道里。应用侧再把这些通道接到 Web UI、SSE、WebSocket、IM 网关、worker、scheduler 或事件系统。
先分清三类事件
| 类别 | 给谁看 | Agently 入口 | 典型内容 |
|---|---|---|---|
| 产品过程事件 | 用户、客服、运营、审批人 | ModelResponse instant stream / TriggerFlow runtime stream | 字段生成、步骤状态、审批提示、报告章节完成 |
| 流程控制事件 | workflow runtime | TriggerFlow emit / when / pause_for / continue_with | 某 chunk 完成、等待审批、外部 webhook 恢复 |
| 运行观测事件 | 开发、运维、评估系统 | Event Center / RuntimeEvent / DevTools | 模型请求、Action 调用、执行环境、flow lifecycle |
产品界面不要直接消费 observation event;流程控制也不要靠 UI 文案推动。面向用户的事件应该是稳定业务字段,面向运维的事件应该记录运行事实。
输出层的最小形态
HTTP request
-> create AgentExecution or TriggerFlow execution
-> stream product events through SSE / WebSocket / IM gateway
-> save final data / snapshot / artifact refs
-> return business response如果只是字段级生成,用 instant stream 映射成 UI patch;如果是多步流程,用 TriggerFlow runtime stream 推稳定阶段事件。最终入库仍然读取最终 get_data()、close snapshot 或 Workspace artifact。
推荐产品事件结构
{
"event_type": "report_section_ready",
"task_id": "task_20260610_001",
"execution_id": "exec_8f2",
"stage": "summarize",
"title": "行业新闻摘要",
"artifact_ref": "workspace://artifacts/daily-report.md",
"status": "ready"
}建议保留这些字段:
| 字段 | 用途 |
|---|---|
event_type | 前端或 IM 网关按类型渲染 |
task_id | 业务任务归属 |
execution_id | 当前运行归属,便于恢复和排查 |
stage | 用户能理解的阶段名 |
status | running、waiting、ready、failed 等稳定状态 |
artifact_ref | 指向 Workspace 里的报告、截图、表单或日志 |
不要把模型 parser path、provider token、内部 chunk 名称直接作为产品事件契约。
SSE / WebSocket / IM 怎么选
| 通道 | 适合场景 | 注意事项 |
|---|---|---|
| HTTP POST | 一次短任务,只需要最终结果 | 返回结构化 data 或 close snapshot |
| SSE | 单向过程流、报告生成、工单进度、长响应 | server 负责 stream 关闭,客户端忽略未知事件 |
| WebSocket | 多轮聊天、持续交互、同连接内有多次输入 | 需要会话归属和连接断开恢复策略 |
| IM 网关 | 企业协作 IM、邮件或通知系统 | IM 是交互层,不是 workflow owner |
| Worker / Queue | 主动任务、批处理、异步长任务 | 应用持久化任务、幂等和调度状态 |
FastAPIHelper 可以承接 HTTP / SSE / WebSocket 的封装;IM 网关和 scheduler 通常由应用系统拥有,再调用 Agently execution。
主动任务的边界
主动任务不是“在接口里睡一会儿”。它通常由外部触发源启动或推进:
- scheduler:定时日报、每周复盘、固定检查;
- webhook:外部系统状态变化、审批回调、支付结果;
- polling:业务系统没有推送能力时,定期检查;
- heartbeat:长任务保活、超时提醒、补偿任务。
推荐结构:
scheduler / webhook / queue
-> load task ownership and idempotency key
-> create or restore TriggerFlow execution
-> emit event or continue interrupt
-> stream / notify product events
-> save final snapshot and Workspace refsAgently 负责执行、编排、等待、恢复和过程输出;应用系统负责触发源、任务表、幂等、租户、鉴权、队列和重试策略。
一个定时日报例子
async def run_daily_report_job(topic: str, task_id: str):
execution = report_flow.create_execution(
auto_close=False,
runtime_resources={"task_id": task_id},
)
await execution.async_start({"topic": topic, "task_id": task_id})
close_task = asyncio.create_task(execution.async_close())
async for item in execution.get_async_runtime_stream(timeout=None):
await notify_ui_or_im({
"event_type": item.get("type", "workflow_event"),
"task_id": task_id,
"execution_id": execution.result.get_meta()["execution_id"],
"stage": item.get("stage"),
"status": item.get("status", "running"),
"artifact_ref": item.get("artifact_ref"),
})
snapshot = await close_task
await save_task_result(task_id, snapshot)这里 scheduler 只负责触发;TriggerFlow 负责日报流程;runtime stream 负责把过程交给产品界面或 IM;最终结果保存到应用任务表和 Workspace。
等待和恢复
需要人工、webhook 或外部系统补充输入时,用 TriggerFlow pause/resume。
flow reaches approval chunk
-> pause_for(type="approval", resume_to="next")
-> product event: waiting_for_approval
-> app stores execution id + interrupt id
-> approver submits payload
-> app calls continue_with(interrupt_id, payload)恢复入口应该保存父 execution id 和 root interrupt id。前端或 IM 只负责展示和收集输入,不直接承担流程恢复逻辑。
不要这样做
| 误用 | 修正 |
|---|---|
| 长任务只等最终结果,过程中没有任何可见状态 | runtime stream 输出阶段级产品事件 |
前端直接消费 instant parser path 作为长期协议 | 在 workflow 或 service 层映射成稳定业务事件 |
| 用 Event Center 事件驱动产品 UI | Event Center 做日志/观测;UI 用产品事件 |
| 把 scheduler 写进 HTTP request handler | 主动触发由 scheduler / queue / worker 拥有 |
没有 task_id / execution_id / trace_id | 任务无法归属、恢复和排查 |
| IM 机器人自己决定工作流走向 | IM 只做交互层,流程由 Agently execution 和应用状态驱动 |
上线检查
| 检查项 | 通过标准 |
|---|---|
| 过程事件 | 用户能看到关键阶段、等待点和失败原因 |
| 最终状态 | 最终持久化来自 data / snapshot / Workspace artifact,不来自临时 patch |
| 通道分工 | UI stream、TriggerFlow signal、Event Center 不混用 |
| 主动触发 | scheduler / webhook / queue 与 API 请求处理分离 |
| 恢复入口 | interrupt id、execution id、恢复 payload 和幂等键可追踪 |
| 通知策略 | IM/WebSocket/SSE 断开后仍能从任务状态恢复 |
| 观测证据 | RuntimeEvent / DevTools / trace 能定位模型、Action、flow 或环境问题 |