Streaming + Structured Playbook
Scenario
You need live UI streaming and structured data at the same time.
Capability (key traits)
- Text streaming:
get_generator(type="delta") - Structured streaming:
get_generator(type="instant") - Single request reuse:
get_response()
Operations
- Define output schema with
output(). - Fix one request with
get_response(). - Consume
delta+instantconcurrently.
Full code
python
import asyncio
from agently import Agently
Agently.set_settings(
"OpenAICompatible",
{
"base_url": "http://localhost:11434/v1",
"model": "qwen2.5:7b",
"model_type": "chat",
},
).set_settings("request_options", {"temperature": 0.2}).set_settings("debug", False)
agent = Agently.create_agent()
response = (
agent.system(
"你是运营日报助手,输出结构化结果。"
)
.input(
"给出一个 3 步的增长实验建议,并提供一句话总结。"
)
.output(
{
"title": (str, "标题"),
"summary": (str, "一句话总结"),
"steps": [
{
"step": (str, "步骤"),
"owner": (str, "负责人角色"),
}
],
}
)
.get_response()
)
async def stream_text():
chunks = []
async for chunk in response.get_async_generator(type="delta"):
chunks.append(chunk)
return "".join(chunks)
async def stream_structured():
events = []
async for item in response.get_async_generator(type="instant"):
if item.is_complete:
events.append(f"{item.path} => {item.value}")
return events
async def main():
text_stream, structured_events = await asyncio.gather(
stream_text(),
stream_structured(),
)
print("TEXT_STREAM:")
print(text_stream)
print("\nSTRUCTURED_STREAM:")
for event in structured_events:
print(event)
if __name__ == "__main__":
asyncio.run(main())Real output
text
TEXT_STREAM:
{
"title": "3步增长实验建议",
"summary": "通过用户调研反馈、数据分析优化及AB测试营销策略,进一步增强用户活跃度。",
"steps": [
{
"step": "进行用户体验调研和问卷调查,了解用户对于现有功能使用体验的感受以及改进建议。",
"owner": "产品经理"
},
{
"step": "依据调研结果和现有数据分析潜在优化方向,并设计针对性的改进方案。",
"owner": "数据分析师"
},
{
"step": "通过AB测试对比不同营销策略的效果,持续监控效果以便及时调整。",
"owner": "市场推广负责人"
}
]
}
STRUCTURED_STREAM:
title => 3步增长实验建议
summary => 通过用户调研反馈、数据分析优化及AB测试营销策略,进一步增强用户活跃度。
steps[0].step => 进行用户体验调研和问卷调查,了解用户对于现有功能使用体验的感受以及改进建议。
steps[0].owner => 产品经理
steps[1].step => 依据调研结果和现有数据分析潜在优化方向,并设计针对性的改进方案。
steps[1].owner => 数据分析师
steps[2].step => 通过AB测试对比不同营销策略的效果,持续监控效果以便及时调整。
steps[2].owner => 市场推广负责人
steps[0] => {'step': '进行用户体验调研和问卷调查,了解用户对于现有功能使用体验的感受以及改进建议。', 'owner': '产品经理'}
steps[1] => {'step': '依据调研结果和现有数据分析潜在优化方向,并设计针对性的改进方案。', 'owner': '数据分析师'}
steps[2] => {'step': '通过AB测试对比不同营销策略的效果,持续监控效果以便及时调整。', 'owner': '市场推广负责人'}
steps => [{'step': '进行用户体验调研和问卷调查,了解用户对于现有功能使用体验的感受以及改进建议。', 'owner': '产品经理'}, {'step': '依据调研结果和现有数据分析潜在优化方向,并设计针对性的改进方案。', 'owner': '数据分析师'}, {'step': '通过AB测试对比不同营销策略的效果,持续监控效果以便及时调整。', 'owner': '市场推广负责人'}]Validation
- Both
deltaandinstantare consumed reliably. - Structured fields appear in
instantevents.