Skip to content

FastAPI 服务封装

原型脚本能跑,不代表业务系统能消费。服务端需要明确的请求体、响应形态、错误包装、流式协议,以及长流程执行结果怎么返回。

FastAPIHelper 处理的是这层封装。它是 FastAPI 的子类,可以把 AgentModelRequestTriggerFlowTriggerFlowExecution 或 generator 函数暴露成路由。

FastAPI 是承载,不是服务化本身

服务化真正要交付的是接口契约,不是某个 HTTP 框架。FastAPIHelper 可以快速把 Agently 能力暴露出去,但进入业务系统前,仍要把下面几件事定义清楚:

契约面推荐做法
请求体明确 dataoptions 的字段、类型、默认值和必填项
成功响应只暴露业务方需要的字段,不直接泄露内部 snapshot 全量结构
错误响应区分请求体错误、模型失败、Action 失败、流程 pending 和服务异常
流式过程把 instant stream / runtime stream 投影成稳定产品事件
最终状态临时 patch 用于 UI,最终入库仍从 get_data()、execution result 或 close snapshot 读取
版本和健康检查路由加版本前缀,提供不依赖业务模块的 /health

早期可以先用默认包装验证可行性。进入前端、客户系统或多服务调用后,建议用 service wrapper 或 response_warper 把内部运行事实投影成稳定 API。

最小 HTTP 端点

python
from agently import Agently
from agently.integrations.fastapi import FastAPIHelper

agent = Agently.create_agent()

app = FastAPIHelper(response_provider=agent)
app.use_post("/chat")

运行:

bash
uvicorn module:app

默认 POST body:

json
{
  "data": {
    "input": "你好"
  },
  "options": {}
}

只创建 FastAPIHelper(...) 不会自动注册路由。需要显式调用 use_post(...)use_get(...)use_sse(...)use_websocket(...)

默认响应形态

成功:

json
{
  "status": 200,
  "data": "...",
  "msg": null
}

错误:

json
{
  "status": 422,
  "data": null,
  "msg": "...错误信息...",
  "error": {
    "type": "ValueError",
    "message": "...",
    "args": []
  }
}

默认状态码:

异常状态码
ValueError422
TimeoutError504
其他异常400

返回值会经过 fastapi.encoders.jsonable_encoder,保持 JSON-safe。

把结构化字段流给前端

如果前端希望边生成边看到字段,可以把 async generator 暴露成 SSE:

python
from agently.integrations.fastapi import FastAPIHelper


async def stream_answer(request_data):
    result = (
        agent
        .input(request_data["data"])
        .output({
            "title": (str, "标题", True),
            "body": (str, "正文", True),
        })
        .get_result()
    )

    async for item in result.get_async_generator(type="instant"):
        if item.delta:
            yield {
                "path": item.path,
                "delta": item.delta,
                "done": item.is_complete,
            }

app = FastAPIHelper(response_provider=stream_answer)
app.use_sse("/answer/stream")

每次 yield 的对象会被 JSON 编码后作为流式 chunk 发送。给 SSE 消费者时,helper 负责分帧。

最终业务数据仍应从 result 的最终 get_data() / async_get_data() 读取和保存。流式 item 更适合 UI 临时状态。

包装 TriggerFlow

response_providerTriggerFlow 时,helper 会为每个请求创建 execution。响应里的 data 直接承载 close snapshot:

python
from agently import TriggerFlow

flow = TriggerFlow(name="answer")
# 定义 flow chunk ...

app = FastAPIHelper(response_provider=flow)
app.use_post("/answer")

如果前端只需要 snapshot 的一部分,用自定义 response_warper 投影:

python
def project_snapshot(response_or_exception):
    if isinstance(response_or_exception, Exception):
        return {"status": 400, "data": None, "msg": str(response_or_exception)}

    snapshot = response_or_exception
    if isinstance(snapshot, dict):
        return {
            "status": 200,
            "data": {"answer": snapshot.get("answer")},
            "msg": None,
        }

    return {"status": 200, "data": snapshot, "msg": None}


app = FastAPIHelper(response_provider=flow, response_warper=project_snapshot)
app.use_post("/answer")

传入自定义 response_warper 后,成功和异常两条路径都由这个函数负责;默认 {status, data, msg, error} 包装不会再自动叠加。

WebSocket

.use_websocket("/ws") 会注册 WebSocket 路由。客户端连接后发送:

json
{
  "data": "...",
  "options": {}
}

再接收 stream item。聊天 UI、单连接多轮交互和需要持续推送的界面适合这个入口。

什么时候写自定义 wrapper

默认包装适合早期服务。进入业务系统后,通常要自己整形:

  • 只暴露前端需要的字段。
  • 把 close snapshot 投影成业务对象。
  • 给错误加业务码。
  • 对响应做额外校验。
  • 保持和已有 API 契约一致。

wrapper 是一个函数:

python
def my_warper(response_or_exception):
    ...
    return serializable_dict

成功值和异常都会传进来。换掉 wrapper 后,两条路径都由应用负责。

推荐配方

目标做法
一个 agent,一个 HTTP 端点FastAPIHelper(response_provider=agent).use_post("/chat")
字段级流式 UIgenerator 里消费 get_async_generator(type="instant"),再 use_sse(...)
长流程返回最终状态response_provider=flow,让 data 承载 close snapshot
长流程推中间进度自定义 generator 消费 TriggerFlow runtime stream
产品界面或 IM 要稳定过程事件service 层把 instant / runtime stream 映射成业务事件
严格业务响应自定义 response_warper 投影和校验

常见误用

  • 以为 FastAPIHelper(...) 会自动注册路由。需要显式 use_post / use_sse 等。
  • 以为有 HTTP 端点就完成了服务化。真正的交付边界是请求、响应、错误、流式过程和版本契约。
  • 在服务端继续使用同步 demo 写法。服务、SSE、WebSocket 优先 async。
  • TriggerFlow 响应里期待固定 result 字段。当前 data 承载 close snapshot,需要业务形态时自己投影。
  • 自定义 wrapper 只处理成功,不处理异常。传入 wrapper 后错误路径也由你负责。
  • 把流式 patch 直接当最终业务数据保存。最终状态仍然从 result 或 snapshot 读取。
  • 把 scheduler、polling 或 webhook 逻辑塞进 HTTP handler。主动任务应由应用的 scheduler / worker / queue 拥有,再调用 Agently execution。

下一步