在大模型技术蓬勃发展的今天,我们常常会被“一切顺利”的表象所迷惑。就像 Cilia Madani 在其文章中描述的那样,一个基于 OpenAI Agents SDK 构建的,能够输出结构化 JSON 的投资建议 AI 代理,在非流式传输模式下运行完美。然而,一旦涉及到 流式传输,问题就接踵而至。本文将深入探讨 OpenAI Agents SDK 在处理 结构化输出 时,特别是涉及到 JSON 格式的复杂对象时,流式传输可能遇到的挑战,并提供相应的解决方案。我们将以投资建议场景为例,剖析从 pydantic 模型到流式输出的每一个环节,帮助开发者避开“无流”的幻觉,构建真正高效稳定的 AI 应用。
初始的完美:非流式结构化输出
在文章开篇,作者提到她的 AI 代理能够生成 结构化 JSON 形式的投资建议。这种结构化的优势在于便于机器解析和利用。文章中使用 pydantic
定义了一个 InvestmentAdvice
模型,包含 short_term
、medium_term
和 long_term
三个字段。
from pydantic import BaseModel
class InvestmentAdvice(BaseModel):
short_term: str
medium_term: str
long_term: str
#Agent的简单使用示例,非流式
agent = Agent(output_type=InvestmentAdvice)
output = agent.run("What should I invest in today?")
print(output.short_term)
在这个非流式传输的场景下,Agent.run()
方法一次性返回完整的 InvestmentAdvice
对象,我们可以直接访问其 short_term
属性。这种方式简单直观,对于简单的应用场景来说非常适用。但其缺点也显而易见:用户必须等待所有建议生成完毕才能看到第一条建议,响应时间较长,用户体验欠佳。
更深层次的问题在于,大型语言模型(LLM)生成答案的过程是逐步的,是一个“思考”的过程。如果我们能够将这个思考过程可视化,让用户逐步看到答案的生成,无疑能够大大提升用户的感知速度和信任感。这就是 流式传输 的意义所在。
流式传输的需求与挑战:用户体验至上
作者的需求是:尽快展示第一个投资建议(即 short_term
属性),同时将剩余的建议作为内部上下文,继续进行 agent 的后续运行。这个需求看似简单,实则蕴含了 流式传输 在 结构化输出 场景下的核心挑战。
挑战主要体现在以下几个方面:
- JSON 结构的完整性: 流式传输本质上是将 LLM 的输出进行分段,但 JSON 必须是一个完整的、符合语法的对象。如何在流式传输的过程中保证 JSON 的结构完整性,避免出现语法错误,是一个关键问题。
- pydantic 模型的适配性:
pydantic
模型通常用于验证和序列化完整的对象。如何在流式传输的过程中,逐步填充pydantic
模型,并在适当的时候进行验证和序列化,需要仔细设计。 - OpenAI Agents SDK 的限制: OpenAI Agents SDK 提供了一定的流式传输支持,但对于复杂结构化输出的支持可能存在限制。我们需要深入了解 SDK 的工作原理,才能找到合适的解决方案。
- 异步编程的复杂性: 流式传输通常涉及到异步编程,需要处理回调、事件循环等复杂概念。对于不熟悉异步编程的开发者来说,可能会增加开发难度。
流式传输的错误尝试与深刻教训
实现流式输出并非易事,稍有不慎就会陷入各种意想不到的陷阱。作者在文章中并未详细描述她尝试过的错误方法,但我们可以根据常识推断出一些常见的坑:
- 直接拼接字符串: 最简单粗暴的方法是直接拼接字符串,试图手动构建 JSON 对象。这种方法很容易出错,特别是在处理复杂的嵌套结构时。例如,如果
short_term
建议中包含特殊字符,就需要进行转义,否则会导致 JSON 解析失败。 - 过早地尝试解析 JSON: 如果在 JSON 对象尚未完整生成时就尝试解析,必然会导致解析错误。例如,如果只收到了
{"short_term": "Buy"
,就尝试解析,肯定会抛出异常。 - 忽略了 OpenAI Agents SDK 的限制: 盲目地使用 SDK 的流式传输 API,而没有仔细研究其工作原理,可能会导致意想不到的结果。例如,SDK 内部可能对流式传输的数据进行了处理,导致我们无法直接获得原始的 JSON 数据。
从这些错误尝试中我们可以学到:流式传输结构化输出,绝不是简单的字符串拼接或盲目调用 API 就能实现的。我们需要深入理解 JSON 的结构、pydantic
的验证机制,以及 OpenAI Agents SDK 的工作原理,才能找到正确的解决方案。
流式传输的进阶方案:逐个字段输出
一个可行的方案是,将 结构化输出 分解为更小的单元,例如,逐个字段进行流式传输。具体来说,我们可以让 LLM 先输出 short_term
字段的值,然后再输出 medium_term
字段的值,最后输出 long_term
字段的值。
这种方法的好处在于,每个字段的值都可以被视为一个独立的字符串,我们可以直接将其发送给客户端,而无需等待整个 JSON 对象生成完毕。
实现这种方案,我们需要对 Agent 的 prompt 进行精心的设计,引导 LLM 按照预定的顺序输出字段的值。例如,我们可以这样设计 prompt:
prompt = """
请你为用户提供个性化的投资建议,包括短期、中期和长期三个方面。
请按照以下格式输出:
短期建议:{short_term}
中期建议:{medium_term}
长期建议:{long_term}
"""
然后,我们可以使用 OpenAI Agents SDK 的流式传输 API,逐行读取 LLM 的输出,并提取出每个字段的值。
from openai import OpenAI
from pydantic import BaseModel
from typing import Iterator, AsyncIterator
class InvestmentAdvice(BaseModel):
short_term: str = ""
medium_term: str = ""
long_term: str = ""
class StreamingAgent:
def __init__(self, api_key: str, model: str = "gpt-3.5-turbo-1106"):
self.client = OpenAI(api_key=api_key)
self.model = model
def run(self, prompt: str) -> Iterator[InvestmentAdvice]:
messages = [{"role": "user", "content": prompt}]
stream = self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True,
response_format={"type": "text"}
)
advice = InvestmentAdvice()
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
# 假设LLM严格按照"字段名: 值"的格式输出
if "短期建议:" in content:
advice.short_term = content.replace("短期建议:", "").strip()
yield advice # 立即返回包含 short_term 的对象
elif "中期建议:" in content:
advice.medium_term = content.replace("中期建议:", "").strip()
elif "长期建议:" in content:
advice.long_term = content.replace("长期建议:", "").strip()
yield advice # 返回包含所有字段的对象
# 示例用法 (需要设置 OPENAI_API_KEY 环境变量)
import os
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
agent = StreamingAgent(api_key=api_key)
prompt = """
请你为用户提供个性化的投资建议,包括短期、中期和长期三个方面。
请按照以下格式输出:
短期建议:{short_term}
中期建议:{medium_term}
长期建议:{long_term}
"""
for advice in agent.run(prompt):
if advice.short_term:
print(f"短期建议: {advice.short_term}") # 立即展示
if advice.medium_term:
print(f"中期建议: {advice.medium_term}")
if advice.long_term:
print(f"长期建议: {advice.long_term}")
else:
print("请设置 OPENAI_API_KEY 环境变量")
在这个示例中,StreamingAgent.run()
方法返回一个迭代器,每次迭代返回一个 InvestmentAdvice
对象。当 short_term
字段的值被填充时,我们立即返回包含 short_term
的对象,从而实现了 流式传输 的效果。
异步流式传输
如果需要更高的性能,可以使用异步流式传输。 如下示例:
from openai import AsyncOpenAI
from pydantic import BaseModel
from typing import AsyncIterator
class InvestmentAdvice(BaseModel):
short_term: str = ""
medium_term: str = ""
long_term: str = ""
class AsyncStreamingAgent:
def __init__(self, api_key: str, model: str = "gpt-3.5-turbo-1106"):
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
async def run(self, prompt: str) -> AsyncIterator[InvestmentAdvice]:
messages = [{"role": "user", "content": prompt}]
stream = await self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True,
response_format={"type": "text"}
)
advice = InvestmentAdvice()
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
# 假设LLM严格按照"字段名: 值"的格式输出
if "短期建议:" in content:
advice.short_term = content.replace("短期建议:", "").strip()
yield advice # 立即返回包含 short_term 的对象
elif "中期建议:" in content:
advice.medium_term = content.replace("中期建议:", "").strip()
elif "长期建议:" in content:
advice.long_term = content.replace("长期建议:", "").strip()
yield advice # 返回包含所有字段的对象
# 示例用法 (需要设置 OPENAI_API_KEY 环境变量)
import asyncio
import os
async def main():
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
agent = AsyncStreamingAgent(api_key=api_key)
prompt = """
请你为用户提供个性化的投资建议,包括短期、中期和长期三个方面。
请按照以下格式输出:
短期建议:{short_term}
中期建议:{medium_term}
长期建议:{long_term}
"""
async for advice in agent.run(prompt):
if advice.short_term:
print(f"短期建议: {advice.short_term}") # 立即展示
if advice.medium_term:
print(f"中期建议: {advice.medium_term}")
if advice.long_term:
print(f"长期建议: {advice.long_term}")
else:
print("请设置 OPENAI_API_KEY 环境变量")
if __name__ == "__main__":
asyncio.run(main())
注意事项:
- 这种方法依赖于 LLM 能够严格按照预定的格式输出字段的值。如果 LLM 的输出格式不符合预期,可能会导致解析错误。
- 我们需要对 LLM 的输出进行清洗,去除多余的空格和换行符,以保证数据的准确性。
- 这种方法只适用于简单的结构化输出。对于复杂的嵌套结构,我们需要采用更高级的解决方案。
更高级的方案:基于事件的流式传输
对于更复杂的 结构化输出 场景,我们可以考虑基于事件的 流式传输 方案。这种方案的核心思想是,将 LLM 的输出转化为一系列的事件,例如 FieldStart
、FieldData
和 FieldEnd
。
FieldStart
事件表示一个新的字段开始输出。FieldData
事件表示字段的值的一部分。FieldEnd
事件表示字段的输出结束。
客户端可以监听这些事件,并根据事件的类型,逐步构建完整的 JSON 对象。
实现这种方案,我们需要自定义一个解析器,将 LLM 的输出转化为事件流。这个解析器可以使用正则表达式或者状态机来实现。
这种方法的优点在于,它可以处理任意复杂的 结构化输出,并且具有很高的灵活性。但其缺点是,实现起来比较复杂,需要编写大量的代码。
总结与展望:拥抱流式传输的未来
流式传输 是大模型应用的重要发展方向。它不仅能够提升用户体验,还能降低服务器的负载。然而,在 结构化输出 场景下,流式传输 并非易事。我们需要深入理解 JSON 的结构、pydantic
的验证机制,以及 OpenAI Agents SDK 的工作原理,才能找到合适的解决方案。
本文介绍了逐个字段输出和基于事件的流式传输两种方案,希望能对开发者有所启发。未来,随着大模型技术的不断发展,我们相信会出现更简单、更高效的流式传输方案。
让我们一起拥抱 流式传输 的未来,构建更加智能、高效的 AI 应用! 避免在 OpenAI Agents SDK 中,对 JSON 结构产生“无流”的幻觉。