FastAPI 服务封装
原型脚本能跑,不代表业务系统能消费。服务端需要明确的请求体、响应形态、错误包装、流式协议,以及长流程执行结果怎么返回。
FastAPIHelper 处理的是这层封装。它是 FastAPI 的子类,可以把 Agent、ModelRequest、TriggerFlow、TriggerFlowExecution 或 generator 函数暴露成路由。
FastAPI 是承载,不是服务化本身
服务化真正要交付的是接口契约,不是某个 HTTP 框架。FastAPIHelper 可以快速把 Agently 能力暴露出去,但进入业务系统前,仍要把下面几件事定义清楚:
| 契约面 | 推荐做法 |
|---|---|
| 请求体 | 明确 data 和 options 的字段、类型、默认值和必填项 |
| 成功响应 | 只暴露业务方需要的字段,不直接泄露内部 snapshot 全量结构 |
| 错误响应 | 区分请求体错误、模型失败、Action 失败、流程 pending 和服务异常 |
| 流式过程 | 把 instant stream / runtime stream 投影成稳定产品事件 |
| 最终状态 | 临时 patch 用于 UI,最终入库仍从 get_data()、execution result 或 close snapshot 读取 |
| 版本和健康检查 | 路由加版本前缀,提供不依赖业务模块的 /health |
早期可以先用默认包装验证可行性。进入前端、客户系统或多服务调用后,建议用 service wrapper 或 response_warper 把内部运行事实投影成稳定 API。
最小 HTTP 端点
from agently import Agently
from agently.integrations.fastapi import FastAPIHelper
agent = Agently.create_agent()
app = FastAPIHelper(response_provider=agent)
app.use_post("/chat")运行:
uvicorn module:app默认 POST body:
{
"data": {
"input": "你好"
},
"options": {}
}只创建 FastAPIHelper(...) 不会自动注册路由。需要显式调用 use_post(...)、use_get(...)、use_sse(...) 或 use_websocket(...)。
默认响应形态
成功:
{
"status": 200,
"data": "...",
"msg": null
}错误:
{
"status": 422,
"data": null,
"msg": "...错误信息...",
"error": {
"type": "ValueError",
"message": "...",
"args": []
}
}默认状态码:
| 异常 | 状态码 |
|---|---|
ValueError | 422 |
TimeoutError | 504 |
| 其他异常 | 400 |
返回值会经过 fastapi.encoders.jsonable_encoder,保持 JSON-safe。
把结构化字段流给前端
如果前端希望边生成边看到字段,可以把 async generator 暴露成 SSE:
from agently.integrations.fastapi import FastAPIHelper
async def stream_answer(request_data):
result = (
agent
.input(request_data["data"])
.output({
"title": (str, "标题", True),
"body": (str, "正文", True),
})
.get_result()
)
async for item in result.get_async_generator(type="instant"):
if item.delta:
yield {
"path": item.path,
"delta": item.delta,
"done": item.is_complete,
}
app = FastAPIHelper(response_provider=stream_answer)
app.use_sse("/answer/stream")每次 yield 的对象会被 JSON 编码后作为流式 chunk 发送。给 SSE 消费者时,helper 负责分帧。
最终业务数据仍应从 result 的最终 get_data() / async_get_data() 读取和保存。流式 item 更适合 UI 临时状态。
包装 TriggerFlow
response_provider 是 TriggerFlow 时,helper 会为每个请求创建 execution。响应里的 data 直接承载 close snapshot:
from agently import TriggerFlow
flow = TriggerFlow(name="answer")
# 定义 flow chunk ...
app = FastAPIHelper(response_provider=flow)
app.use_post("/answer")如果前端只需要 snapshot 的一部分,用自定义 response_warper 投影:
def project_snapshot(response_or_exception):
if isinstance(response_or_exception, Exception):
return {"status": 400, "data": None, "msg": str(response_or_exception)}
snapshot = response_or_exception
if isinstance(snapshot, dict):
return {
"status": 200,
"data": {"answer": snapshot.get("answer")},
"msg": None,
}
return {"status": 200, "data": snapshot, "msg": None}
app = FastAPIHelper(response_provider=flow, response_warper=project_snapshot)
app.use_post("/answer")传入自定义 response_warper 后,成功和异常两条路径都由这个函数负责;默认 {status, data, msg, error} 包装不会再自动叠加。
WebSocket
.use_websocket("/ws") 会注册 WebSocket 路由。客户端连接后发送:
{
"data": "...",
"options": {}
}再接收 stream item。聊天 UI、单连接多轮交互和需要持续推送的界面适合这个入口。
什么时候写自定义 wrapper
默认包装适合早期服务。进入业务系统后,通常要自己整形:
- 只暴露前端需要的字段。
- 把 close snapshot 投影成业务对象。
- 给错误加业务码。
- 对响应做额外校验。
- 保持和已有 API 契约一致。
wrapper 是一个函数:
def my_warper(response_or_exception):
...
return serializable_dict成功值和异常都会传进来。换掉 wrapper 后,两条路径都由应用负责。
推荐配方
| 目标 | 做法 |
|---|---|
| 一个 agent,一个 HTTP 端点 | FastAPIHelper(response_provider=agent).use_post("/chat") |
| 字段级流式 UI | generator 里消费 get_async_generator(type="instant"),再 use_sse(...) |
| 长流程返回最终状态 | response_provider=flow,让 data 承载 close snapshot |
| 长流程推中间进度 | 自定义 generator 消费 TriggerFlow runtime stream |
| 产品界面或 IM 要稳定过程事件 | service 层把 instant / runtime stream 映射成业务事件 |
| 严格业务响应 | 自定义 response_warper 投影和校验 |
常见误用
- 以为
FastAPIHelper(...)会自动注册路由。需要显式use_post/use_sse等。 - 以为有 HTTP 端点就完成了服务化。真正的交付边界是请求、响应、错误、流式过程和版本契约。
- 在服务端继续使用同步 demo 写法。服务、SSE、WebSocket 优先 async。
- TriggerFlow 响应里期待固定
result字段。当前data承载 close snapshot,需要业务形态时自己投影。 - 自定义 wrapper 只处理成功,不处理异常。传入 wrapper 后错误路径也由你负责。
- 把流式 patch 直接当最终业务数据保存。最终状态仍然从 result 或 snapshot 读取。
- 把 scheduler、polling 或 webhook 逻辑塞进 HTTP handler。主动任务应由应用的 scheduler / worker / queue 拥有,再调用 Agently execution。
下一步
- 为什么服务路径优先 async:Async First
- 字段级流式结果:模型响应
- TriggerFlow runtime stream:事件与流
- 产品事件、IM 和主动任务:交互层与主动任务 Playbook
- 带 Action 的 agent 服务:Actions 概览