在使用大型语言模型 (LLM) 的应用开发中,将非结构化文本转换为结构化数据(例如 JSON),以便用于表单、API 或数据分析,已经成为一种常见的需求。然而,生成结构化输出,特别是复杂或冗长的输出,往往会显著降低应用程序的响应速度。为了解决这个问题,流式传输结构化输出应运而生。本文将深入探讨如何使用 FastAPI 和 PydanticAI 实现 LLM 结构化 JSON 输出的实时流式传输,从而显著提升用户体验,并打造更加智能的应用。

为什么需要流式传输结构化输出?

在快节奏的互联网时代,用户对应用程序的响应时间有着极高的要求。传统的做法是等待 LLM 生成完整的 JSON 响应后,再将其发送给客户端。这种方式会导致用户需要等待较长的时间,尤其是在处理复杂数据时,等待时间甚至可能达到 10-30 秒。长时间的等待会严重影响用户体验,降低用户参与度,并可能导致用户流失。

流式传输结构化输出则可以有效解决这个问题。通过在 LLM 生成数据的同时,逐步将部分结果发送给客户端,用户可以立即看到更新,而无需等待整个过程完成。这种实时反馈机制可以显著降低用户感知的延迟,提高用户参与度,并使应用程序感觉更快、更智能。

例如,在一个简历解析应用中,如果使用传统的等待方式,用户可能需要等待 10 秒才能看到解析后的简历信息。而使用流式传输后,用户可以在 LLM 解析出姓名、邮箱等信息的第一时间就看到这些信息,随着解析的进行,更多信息会逐步显示出来。这种即时反馈能够极大地提升用户体验。

PydanticAI:定义和强制结构化输出

PydanticAI 是由 Pydantic 的创建者开发的 Python 库,它弥合了自由格式 LLM 输出和严格数据结构之间的差距。它允许开发者定义明确的 Schema,从而确保 LLM 的输出符合预期的格式。

使用 PydanticAI,你可以定义一个描述所需 JSON 结构的 Schema。然后,你可以将该 Schema 提供给 LLM,并指示它生成符合该 Schema 的输出。 PydanticAI 会自动验证 LLM 的输出,并确保其符合定义的 Schema

例如,在简历解析的案例中,我们可以使用 TypedDict 定义一个 CompleteResume Schema,该 Schema 包含联系方式、摘要、工作经验、教育背景等字段。

from typing import List
from typing_extensions import TypedDict
from pydantic_ai import Agent

# 定义简历的结构
class ContactInfo(TypedDict, total=False):
    name: str
    email: str
    phone: str
    possible_work_locations: List[str]

class WorkExperience(TypedDict, total=False):
    role: str
    company: str
    location: str
    dates: str
    descriptions: List[str]

class Education(TypedDict, total=False):
    degree: str
    university: str
    dates: str
    achievements: List[str]

class Certification(TypedDict, total=False):
    name: str
    organization: str
    date: str

class Project(TypedDict, total=False):
    name: str
    tech_stack: List[str]
    purpose: str

class SkillCategory(TypedDict, total=False):
    category: str
    skills: List[str]

class CompleteResume(TypedDict, total=False):
    contact_info: ContactInfo
    summary: str
    work_experience: List[WorkExperience]
    education: List[Education]
    certification_and_training: List[Certification]
    projects: List[Project]
    skill_categories: List[SkillCategory]

agent = Agent(
    'openai:gpt-4.1-mini',
    output_type=CompleteResume,
    system_prompt="""
    Parse resumes into structured JSON format with these sections:
    - contact_info: name, email, phone, locations
      - summary: professional summary
    - work_experience: jobs with role, company, location, dates, descriptions
    - education: degrees, universities, dates, achievements
    - certification_and_training: certifications and training
    - projects: projects with tech stack and purpose
    - skill_categories: skills grouped by category
    Extract as much detail as possible from the resume text.
    """
)

在上述代码中,我们定义了 CompleteResume Schema,并使用 Agent 类将该 Schema 与 LLM (openai:gpt-4.1-mini) 关联起来。system_prompt 指示 LLM 将简历解析为指定的 JSON 格式。

使用 run_stream() 方法实现流式传输

PydanticAI 提供了 run_stream() 方法,该方法可以实现 LLM 输出的 流式传输run_stream() 方法会返回一个异步迭代器,该迭代器会逐步生成部分结构化输出。

以下代码展示了如何使用 run_stream() 方法来 流式传输 简历解析的结果:

import json

async def generate_stream(resume_text: str):
    async with agent.run_stream(resume_text) as result:
        async for partial_resume in result.stream():
            json_data = json.dumps(partial_resume, default=str)
            yield f"data: {json_data}\n\n"

run_stream() 方法返回一个异步上下文管理器,使用 async with 语句可以确保在完成 流式传输 后,资源能够被正确释放。result.stream() 方法返回一个异步迭代器,该迭代器会逐步生成 CompleteResume 类型的对象。我们使用 json.dumps() 方法将每个部分结果转换为 JSON 字符串,并使用 data: 前缀和双换行符 \n\n 对 JSON 数据进行格式化,使其符合 Server-Sent Events (SSE) 协议。

FastAPI:构建实时流式 JSON 输出的后端

FastAPI 是一个高性能的 Python Web 框架,它非常适合构建 API。我们可以使用 FastAPI 来构建一个后端端点,该端点接收简历文本作为输入,并使用 PydanticAIrun_stream() 方法来 流式传输 结构化 JSON 输出。

以下代码展示了如何使用 FastAPI 来构建一个 /parse-resume 端点:

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/parse-resume")
async def parse_resume(request: Request):
    body = await request.json()
    resume_text = body.get("text", "").strip()
    if not resume_text:
        raise HTTPException(status_code=400, detail="Resume text is required")

    async def generate_stream():
        try:
            async with agent.run_stream(resume_text) as result:
                async for partial_resume in result.stream():
                    json_data = json.dumps(partial_resume, default=str)
                    yield f"data: {json_data}\n\n"
        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return StreamingResponse(
        generate_stream(),
        media_type="text/event-stream",
        headers={"Connection": "keep-alive"}
    )

/parse-resume 端点接收一个 POST 请求,该请求包含简历文本。该端点调用 generate_stream() 函数来生成 流式传输 的 JSON 输出。StreamingResponse 类用于将 流式传输 的数据发送给客户端。media_type 参数设置为 text/event-stream,以指示客户端该响应是一个 SSE 流。headers 参数设置为 {"Connection": "keep-alive"},以保持客户端和服务器之间的连接。

React:使用 fetch-event-source 处理 SSE 流

在前端,我们可以使用 React 和 @microsoft/fetch-event-source 库来处理 SSE 流,并实时更新 UI。@microsoft/fetch-event-source 库是原生 EventSource API 的一个替代品,它支持 POST 请求,而原生 EventSource API 仅支持 GET 请求。

以下代码展示了如何使用 React 和 @microsoft/fetch-event-source 库来处理 SSE 流:

import { fetchEventSource } from '@microsoft/fetch-event-source';
import { useState } from 'react';

function ResumeParser() {
  const [resumeText, setResumeText] = useState('');
  const [parsedResume, setParsedResume] = useState({});
  const [isStreaming, setIsStreaming] = useState(false);

  const handleParseResume = async () => {
    setIsStreaming(true);
    await fetchEventSource('http://localhost:8000/parse-resume', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ text: resumeText }),
      onmessage(event) {
        const data = JSON.parse(event.data);
        setParsedResume(prevResume => ({...prevResume, ...data})); // 增量更新 UI
      },
      onclose() {
        setIsStreaming(false);
      },
      onerror(err) {
        console.error('Stream error:', err);
        setIsStreaming(false);
      }
    });
  };

  return (
    <div>
      <textarea value={resumeText} onChange={e => setResumeText(e.target.value)} />
      <button onClick={handleParseResume} disabled={isStreaming}>
        {isStreaming ? 'Parsing...' : 'Parse Resume'}
      </button>
      <pre>{JSON.stringify(parsedResume, null, 2)}</pre>
    </div>
  );
}

export default ResumeParser;

handleParseResume 函数使用 fetchEventSource 函数向 /parse-resume 端点发送一个 POST 请求。onmessage 回调函数在每次接收到新的数据块时被调用。该回调函数解析 JSON 数据,并使用 setParsedResume 函数更新 React 状态,从而触发 UI 的重新渲染。通过使用增量更新 setParsedResume(prevResume => ({...prevResume, ...data})),新的数据块会被合并到现有的 parsedResume 状态中,从而避免了覆盖先前接收到的数据。onclose 回调函数在连接关闭时被调用。onerror 回调函数在发生错误时被调用。

优化技巧与注意事项

  1. 错误处理: 在服务器端,务必包含适当的错误处理机制。在 generate_stream 函数中,使用 try...except 块捕获异常,并将错误信息以 JSON 格式发送给客户端。在客户端,onerror 回调函数可以处理这些错误信息,并向用户显示友好的错误提示。
  2. 模型选择: PydanticAIAgent 可以灵活选择不同的 LLM 模型。GPT-4 或其他更强大的模型可能提供更准确的结构化输出,但也会增加延迟。根据应用需求选择合适的模型,平衡准确性和响应速度。
  3. 提示工程 (Prompt Engineering): 通过精心设计的 system_prompt,可以引导 LLM 生成更符合 Schema 要求的输出。明确指令和示例,确保 LLM 理解所需的结构化格式。
  4. 数据校验: 虽然 PydanticAI 能够进行 Schema 验证,但在客户端和服务端都应该进行额外的数据校验,以确保数据的完整性和安全性。
  5. 长连接管理: SSE 依赖于长连接。需要配置服务器和客户端,以确保连接的稳定性和可靠性。例如,可以设置心跳检测机制,定期发送数据包以保持连接。
  6. 流式传输的速率限制: 为了避免客户端过载,可以在服务器端实现 流式传输 的速率限制。例如,可以设置每秒发送的数据块数量上限。
  7. 中间件的使用: 在 FastAPI 中可以使用中间件来处理跨域问题、身份验证和日志记录等。
  8. 状态管理: 在复杂的应用中,可以使用 Redux 或 Zustand 等状态管理库来管理客户端的状态,从而简化 UI 的更新逻辑。
  9. 使用 WebSockets: 虽然 SSE 适合单向数据流,但如果需要双向通信,可以考虑使用 WebSockets。

结论:构建快速、智能的 AI 应用

通过本文的介绍,我们了解了如何使用 FastAPIPydanticAI 实现 LLM 结构化 JSON 输出的实时流式传输。这种方法可以显著提升应用程序的速度、用户体验和响应能力。使用 PydanticAI,我们可以定义结构化的 Schema,并让 LLM 填充它们。使用 FastAPI,我们可以高效地将结果 流式传输 到前端。使用 React 和 fetch-event-source,我们可以保持 UI 的响应性和动态性。

不再需要漫长的等待或加载动画。只需要快速、干净、实时的 JSON —— 这正是用户所需要的。

如果您正在构建基于 LLM 的 AI 应用,那么 流式传输 结构化输出无疑是提升速度、用户体验和响应能力的最佳途径之一。通过结合 PydanticAIFastAPI 和 React,您可以构建出真正快速、智能且用户友好的 AI 应用。

希望本文能够帮助您更好地理解和应用 流式传输 技术,并构建出更加出色的 AI 应用!