在大模型技术蓬勃发展的今天,我们常常会被“一切顺利”的表象所迷惑。就像 Cilia Madani 在其文章中描述的那样,一个基于 OpenAI Agents SDK 构建的,能够输出结构化 JSON 的投资建议 AI 代理,在非流式传输模式下运行完美。然而,一旦涉及到 流式传输,问题就接踵而至。本文将深入探讨 OpenAI Agents SDK 在处理 结构化输出 时,特别是涉及到 JSON 格式的复杂对象时,流式传输可能遇到的挑战,并提供相应的解决方案。我们将以投资建议场景为例,剖析从 pydantic 模型到流式输出的每一个环节,帮助开发者避开“无流”的幻觉,构建真正高效稳定的 AI 应用。

初始的完美:非流式结构化输出

在文章开篇,作者提到她的 AI 代理能够生成 结构化 JSON 形式的投资建议。这种结构化的优势在于便于机器解析和利用。文章中使用 pydantic 定义了一个 InvestmentAdvice 模型,包含 short_termmedium_termlong_term 三个字段。

from pydantic import BaseModel

class InvestmentAdvice(BaseModel):
    short_term: str
    medium_term: str
    long_term: str

#Agent的简单使用示例,非流式
agent = Agent(output_type=InvestmentAdvice)
output = agent.run("What should I invest in today?")
print(output.short_term)

在这个非流式传输的场景下,Agent.run() 方法一次性返回完整的 InvestmentAdvice 对象,我们可以直接访问其 short_term 属性。这种方式简单直观,对于简单的应用场景来说非常适用。但其缺点也显而易见:用户必须等待所有建议生成完毕才能看到第一条建议,响应时间较长,用户体验欠佳。

更深层次的问题在于,大型语言模型(LLM)生成答案的过程是逐步的,是一个“思考”的过程。如果我们能够将这个思考过程可视化,让用户逐步看到答案的生成,无疑能够大大提升用户的感知速度和信任感。这就是 流式传输 的意义所在。

流式传输的需求与挑战:用户体验至上

作者的需求是:尽快展示第一个投资建议(即 short_term 属性),同时将剩余的建议作为内部上下文,继续进行 agent 的后续运行。这个需求看似简单,实则蕴含了 流式传输结构化输出 场景下的核心挑战。

挑战主要体现在以下几个方面:

  1. JSON 结构的完整性: 流式传输本质上是将 LLM 的输出进行分段,但 JSON 必须是一个完整的、符合语法的对象。如何在流式传输的过程中保证 JSON 的结构完整性,避免出现语法错误,是一个关键问题。
  2. pydantic 模型的适配性: pydantic 模型通常用于验证和序列化完整的对象。如何在流式传输的过程中,逐步填充 pydantic 模型,并在适当的时候进行验证和序列化,需要仔细设计。
  3. OpenAI Agents SDK 的限制: OpenAI Agents SDK 提供了一定的流式传输支持,但对于复杂结构化输出的支持可能存在限制。我们需要深入了解 SDK 的工作原理,才能找到合适的解决方案。
  4. 异步编程的复杂性: 流式传输通常涉及到异步编程,需要处理回调、事件循环等复杂概念。对于不熟悉异步编程的开发者来说,可能会增加开发难度。

流式传输的错误尝试与深刻教训

实现流式输出并非易事,稍有不慎就会陷入各种意想不到的陷阱。作者在文章中并未详细描述她尝试过的错误方法,但我们可以根据常识推断出一些常见的坑:

  1. 直接拼接字符串: 最简单粗暴的方法是直接拼接字符串,试图手动构建 JSON 对象。这种方法很容易出错,特别是在处理复杂的嵌套结构时。例如,如果 short_term 建议中包含特殊字符,就需要进行转义,否则会导致 JSON 解析失败。
  2. 过早地尝试解析 JSON: 如果在 JSON 对象尚未完整生成时就尝试解析,必然会导致解析错误。例如,如果只收到了 {"short_term": "Buy",就尝试解析,肯定会抛出异常。
  3. 忽略了 OpenAI Agents SDK 的限制: 盲目地使用 SDK 的流式传输 API,而没有仔细研究其工作原理,可能会导致意想不到的结果。例如,SDK 内部可能对流式传输的数据进行了处理,导致我们无法直接获得原始的 JSON 数据。

从这些错误尝试中我们可以学到:流式传输结构化输出,绝不是简单的字符串拼接或盲目调用 API 就能实现的。我们需要深入理解 JSON 的结构、pydantic 的验证机制,以及 OpenAI Agents SDK 的工作原理,才能找到正确的解决方案。

流式传输的进阶方案:逐个字段输出

一个可行的方案是,将 结构化输出 分解为更小的单元,例如,逐个字段进行流式传输。具体来说,我们可以让 LLM 先输出 short_term 字段的值,然后再输出 medium_term 字段的值,最后输出 long_term 字段的值。

这种方法的好处在于,每个字段的值都可以被视为一个独立的字符串,我们可以直接将其发送给客户端,而无需等待整个 JSON 对象生成完毕。

实现这种方案,我们需要对 Agent 的 prompt 进行精心的设计,引导 LLM 按照预定的顺序输出字段的值。例如,我们可以这样设计 prompt:

prompt = """
请你为用户提供个性化的投资建议,包括短期、中期和长期三个方面。
请按照以下格式输出:
短期建议:{short_term}
中期建议:{medium_term}
长期建议:{long_term}
"""

然后,我们可以使用 OpenAI Agents SDK 的流式传输 API,逐行读取 LLM 的输出,并提取出每个字段的值。

from openai import OpenAI
from pydantic import BaseModel
from typing import Iterator, AsyncIterator

class InvestmentAdvice(BaseModel):
    short_term: str = ""
    medium_term: str = ""
    long_term: str = ""

class StreamingAgent:
    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo-1106"):
        self.client = OpenAI(api_key=api_key)
        self.model = model

    def run(self, prompt: str) -> Iterator[InvestmentAdvice]:
        messages = [{"role": "user", "content": prompt}]
        stream = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            stream=True,
            response_format={"type": "text"}
        )
        advice = InvestmentAdvice()
        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                # 假设LLM严格按照"字段名: 值"的格式输出
                if "短期建议:" in content:
                    advice.short_term = content.replace("短期建议:", "").strip()
                    yield advice # 立即返回包含 short_term 的对象
                elif "中期建议:" in content:
                    advice.medium_term = content.replace("中期建议:", "").strip()
                elif "长期建议:" in content:
                    advice.long_term = content.replace("长期建议:", "").strip()

        yield advice # 返回包含所有字段的对象

# 示例用法 (需要设置 OPENAI_API_KEY 环境变量)
import os
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
    agent = StreamingAgent(api_key=api_key)
    prompt = """
    请你为用户提供个性化的投资建议,包括短期、中期和长期三个方面。
    请按照以下格式输出:
    短期建议:{short_term}
    中期建议:{medium_term}
    长期建议:{long_term}
    """

    for advice in agent.run(prompt):
        if advice.short_term:
            print(f"短期建议: {advice.short_term}") # 立即展示
        if advice.medium_term:
            print(f"中期建议: {advice.medium_term}")
        if advice.long_term:
            print(f"长期建议: {advice.long_term}")
else:
    print("请设置 OPENAI_API_KEY 环境变量")

在这个示例中,StreamingAgent.run() 方法返回一个迭代器,每次迭代返回一个 InvestmentAdvice 对象。当 short_term 字段的值被填充时,我们立即返回包含 short_term 的对象,从而实现了 流式传输 的效果。

异步流式传输

如果需要更高的性能,可以使用异步流式传输。 如下示例:

from openai import AsyncOpenAI
from pydantic import BaseModel
from typing import AsyncIterator

class InvestmentAdvice(BaseModel):
    short_term: str = ""
    medium_term: str = ""
    long_term: str = ""

class AsyncStreamingAgent:
    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo-1106"):
        self.client = AsyncOpenAI(api_key=api_key)
        self.model = model

    async def run(self, prompt: str) -> AsyncIterator[InvestmentAdvice]:
        messages = [{"role": "user", "content": prompt}]
        stream = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            stream=True,
            response_format={"type": "text"}
        )
        advice = InvestmentAdvice()
        async for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                # 假设LLM严格按照"字段名: 值"的格式输出
                if "短期建议:" in content:
                    advice.short_term = content.replace("短期建议:", "").strip()
                    yield advice # 立即返回包含 short_term 的对象
                elif "中期建议:" in content:
                    advice.medium_term = content.replace("中期建议:", "").strip()
                elif "长期建议:" in content:
                    advice.long_term = content.replace("长期建议:", "").strip()

        yield advice # 返回包含所有字段的对象

# 示例用法 (需要设置 OPENAI_API_KEY 环境变量)
import asyncio
import os

async def main():
    api_key = os.environ.get("OPENAI_API_KEY")
    if api_key:
        agent = AsyncStreamingAgent(api_key=api_key)
        prompt = """
        请你为用户提供个性化的投资建议,包括短期、中期和长期三个方面。
        请按照以下格式输出:
        短期建议:{short_term}
        中期建议:{medium_term}
        长期建议:{long_term}
        """

        async for advice in agent.run(prompt):
            if advice.short_term:
                print(f"短期建议: {advice.short_term}") # 立即展示
            if advice.medium_term:
                print(f"中期建议: {advice.medium_term}")
            if advice.long_term:
                print(f"长期建议: {advice.long_term}")
    else:
        print("请设置 OPENAI_API_KEY 环境变量")

if __name__ == "__main__":
    asyncio.run(main())

注意事项:

  • 这种方法依赖于 LLM 能够严格按照预定的格式输出字段的值。如果 LLM 的输出格式不符合预期,可能会导致解析错误。
  • 我们需要对 LLM 的输出进行清洗,去除多余的空格和换行符,以保证数据的准确性。
  • 这种方法只适用于简单的结构化输出。对于复杂的嵌套结构,我们需要采用更高级的解决方案。

更高级的方案:基于事件的流式传输

对于更复杂的 结构化输出 场景,我们可以考虑基于事件的 流式传输 方案。这种方案的核心思想是,将 LLM 的输出转化为一系列的事件,例如 FieldStartFieldDataFieldEnd

  • FieldStart 事件表示一个新的字段开始输出。
  • FieldData 事件表示字段的值的一部分。
  • FieldEnd 事件表示字段的输出结束。

客户端可以监听这些事件,并根据事件的类型,逐步构建完整的 JSON 对象。

实现这种方案,我们需要自定义一个解析器,将 LLM 的输出转化为事件流。这个解析器可以使用正则表达式或者状态机来实现。

这种方法的优点在于,它可以处理任意复杂的 结构化输出,并且具有很高的灵活性。但其缺点是,实现起来比较复杂,需要编写大量的代码。

总结与展望:拥抱流式传输的未来

流式传输 是大模型应用的重要发展方向。它不仅能够提升用户体验,还能降低服务器的负载。然而,在 结构化输出 场景下,流式传输 并非易事。我们需要深入理解 JSON 的结构、pydantic 的验证机制,以及 OpenAI Agents SDK 的工作原理,才能找到合适的解决方案。

本文介绍了逐个字段输出和基于事件的流式传输两种方案,希望能对开发者有所启发。未来,随着大模型技术的不断发展,我们相信会出现更简单、更高效的流式传输方案。

让我们一起拥抱 流式传输 的未来,构建更加智能、高效的 AI 应用! 避免在 OpenAI Agents SDK 中,对 JSON 结构产生“无流”的幻觉。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注