Event Center
Event Center 是 Agently 的框架级运行时事件通道。模型请求、Session 应用、Action 调用、Execution Environment、TriggerFlow lifecycle 等运行事实,都会以 RuntimeEvent 的形态交给 hook、日志 sink 或 DevTools bridge。
它和 TriggerFlow 的 emit / when 分工不同:emit 会改变一个 flow execution 内的下一步走向;RuntimeEvent 只是记录发生了什么。
注册一个 hook
from agently import Agently
captured = []
async def capture(event):
captured.append(event)
Agently.event_center.register_hook(
capture,
event_types="runtime.info",
hook_name="docs.capture",
)
emitter = Agently.event_center.create_emitter("Docs")
await emitter.async_info("hello")
Agently.event_center.unregister_hook("docs.capture")event_types 可以是字符串、字符串列表或 None。传 None 时,hook 接收所有事件。同步函数也能注册;Event Center 会统一转成 async 调用。
自定义应用事件
应用和插件可以用自己的命名空间发送事件:
emitter = Agently.event_center.create_emitter(
"BillingWorker",
base_meta={"tenant": "demo"},
)
await emitter.async_emit(
"billing.invoice_created",
message="invoice created",
payload={"invoice_id": "inv-1"},
)Agently 官方事件使用 model.*、request.*、action.*、tool.*、session.*、agent_turn.*、triggerflow.*、execution_environment.* 等 namespace。自定义事件不要伪装成官方事件,也不要指望 Agently core 消费它们。
高频事件用摘要投递
模型流式增量这类事件频率很高。如果 hook 要把它们转发到远端日志或指标系统,可以给这个 hook 配摘要投递:
Agently.event_center.register_hook(
capture,
event_types="model.response.delta",
hook_name="docs.summary_capture",
delivery_policy={
"mode": "summary",
"dispatch": "await",
"emit_interval": 0.1,
"max_items": 20,
"high_frequency_only": True,
},
)摘要投递只影响当前 hook,不改变生产者发出的 RuntimeEvent,也不影响其他要求 raw 事件的 hook。摘要事件会带 meta["coalesced"]、coalesced_count、first_event_id 和 last_event_id。
只有出口有明确 flush / close 回收点时,再考虑 dispatch="background"。短脚本或 CLI 结束前,可以调用:
await Agently.event_center.async_flush(hook_name)这样可以排空摘要 buffer 和已跟踪的后台投递。
RuntimeEvent 结构
RuntimeEvent 的顶层字段来自 agently.types.data.event.RuntimeEvent。DevTools bridge 会把它投影成 ObservationEvent,序列化结构保持一致。
| 字段 | 含义 |
|---|---|
event_id | 事件 id,默认自动生成 |
event_type | 点路径,如 triggerflow.execution_started |
source | 事件来源 |
level | DEBUG / INFO / WARNING / ERROR / CRITICAL |
message | 人可读消息 |
payload | 结构化数据 |
error | 错误信息;异常会规范化为 ErrorInfo |
run | run lineage,包括 run_id、parent_run_id、session_id、execution_id |
meta | 附加元数据 |
timestamp | 毫秒时间戳 |
消费端关联请求、Session 或 TriggerFlow execution 时,优先读 run。不要从 message 文本里解析 id。
run 和 attempt 不要混
agent_turn 表示一次 Agent 面向用户或调用方的回合。attempt_index 表示同一次模型请求内部的重试序号。DevTools 或日志系统展示时,要把这两件事分开:
- 从
run.run_kind渲染agent_turn。 - 从
model_request事件的payload.attempt_index或run.meta.attempt_index读取模型重试 attempt。
这样排查问题时,才能看清是“用户发起了下一轮”,还是“同一轮里模型请求重试了一次”。
兼容事件
新代码使用 RuntimeEvent。ObservationEvent 是 DevTools bridge 的通讯投影。emit_observation / async_emit_observation 继续作为兼容 alias 可用。
TriggerFlow 历史事件前缀也保留别名:订阅 workflow.execution_started 可以收到 triggerflow.execution_started,订阅 trigger_flow.signal 可以收到 triggerflow.signal。新文档和新代码写 triggerflow.*。
Action Runtime 使用 action.* 作为主 namespace。兼容旧 tools 时,会额外发配对的 tool.* 事件,并带上 meta.compat_event_alias=True、meta.compat_alias_for 和 meta.primary_event_id,方便消费者去重。
Execution Environment 使用 execution_environment.*。当前 manager 事件包括 declared、approval_required、ensuring、ready、unhealthy、releasing、released 和 failed。
运行进展与卡死诊断
Event Center 是接收和分发层。liveness 进展会在高成本 hook 投递前更新,因此一个慢 hook 不应该挡住卡死诊断。
AgentExecution 会把诊断信息放在 async_get_meta()["diagnostics"]:
diagnostics["stages"]["events"]:最近 stage 进展。diagnostics["last_progress"]:最后一次被接受的进展事件。diagnostics["timeouts"]:硬截止超时。diagnostics["stalls"]:idle 无进展卡死。
本地排查可以临时挂 hook,或打开 .set_settings("debug", True) / .set_settings("debug", "detail")。问题定位后,把临时 debug hook 和 debug settings 移走。
消费端要 fail open
RuntimeEvent 是可扩展协议。自定义消费者和 DevTools 都应该宽容处理:
- 忽略未知顶层字段和未知
payload字段。 - 对未知
event_type不报错。 - 不把
payload当作全局固定 schema。 - 优先使用
run做关联,不解析message。
另见
- 观测概览 - 四条通道的职责
- TriggerFlow 事件与流 - flow 内部控制流与 runtime stream
- DevTools - 现成的观测、评估和交互 UI
- FastAPI 服务封装 - 把 runtime stream 转给客户端