你是否曾经历过,满怀期待地让 LLM (大型语言模型) 为你撰写一篇博客文章,却只能眼巴巴地盯着加载指示器,不知是浏览器崩溃了还是 AI 还在苦苦挣扎? 今天,我们将解决这个问题,构建一个能够执行繁重任务(如撰写完整文章)且不会卡死的 AI Web 应用。 你将能够实时看到进度更新,随时了解 AI 的工作状态,让长时间运行的 AI 任务变得快速且互动。

1. 告别“死亡旋转”:长任务为何让应用崩溃

想象一下,你的新型 AI Web 应用 是一位天才内容写作者。 你让它“撰写一篇关于太空探索的文章”,点击回车,然后… 可怕的加载指示器出现了! 整个页面冻结,无法点击任何内容。 一分钟后,浏览器甚至会弹出“此页面无响应”的错误提示。 这就是 Web 应用中长时间运行任务的经典问题。 传统的 Web 应用工作方式如下:

  1. 你发出请求(例如,“生成我的文章!”)。
  2. 服务器处理请求(例如,调用 LLM,耗时 2-3 分钟)。
  3. 你等待… 再等待…(连接保持打开状态,你的浏览器被卡住)。
  4. 最终,你得到结果(或者超时错误!)。

这种用户体验非常糟糕。 用户需要知道他们的请求正在被处理,并能看到进度。

2. 解决方案:智能餐厅类比

将上述过程想象成在餐厅点餐:

  • 糟糕的方式 (标准 Web 请求):你点了一份牛排。 服务员站在你的餐桌旁,一动不动地盯着你看,整整 15 分钟的烹饪时间。 很尴尬,对吧? 你甚至没法要杯水。
  • 更好的方式 (我们的方法):你点了一份牛排。 服务员说:“好的,稍等!我这就通知厨师。”然后走开(BackgroundTasks)。 几分钟后,他们给你端来一些面包(“正在准备!”)。 过一会儿,你的饮料也上来了(“快做好了!”)。 在主菜准备好的同时,你感到开心且被告知进度。

这正是我们今天要构建的 AI Web 应用 所要实现的效果。 我们的应用将:

  • 立即确认用户的请求。
  • 将繁重的 AI 工作卸载到 BackgroundTasks 中。
  • 通过 Server-Sent Events (SSE) 将实时进度更新流式传输回用户。

实现此目标的工具包:

  • FastAPI BackgroundTasks:用于运行 AI 作业,而不会冻结应用。
  • Server-Sent Events (SSE):一种从服务器向浏览器推送实时更新的简单方法。
  • PocketFlow:用于组织我们的多步骤文章撰写流程。

3. 工具介绍:FastAPI BackgroundTasks 和 Server-Sent Events (SSE)

为了构建响应迅速的 AI Web 应用 文章编写器,我们需要两项关键技术:一项用于在后台处理工作,另一项用于报告其进度。

3.1 FastAPI BackgroundTasks:后台工作者

BackgroundTasks 视为将任务交给独立工作的助手。 你告诉 FastAPI:“嘿,在通知用户我收到他们的请求后,请在后台运行这个函数。” 关键在于,用户会立即收到响应。 他们不必等待后台工作完成。

这就像从亚马逊订购商品。 你会立即收到“订单已确认!”的电子邮件。 实际的拣货、包装和运输过程稍后会在后台进行。

下面是一个简单的示例:在用户注册后发送欢迎电子邮件。

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

# 这是一个耗时的任务(例如,调用电子邮件服务)
def send_welcome_email(email: str):
    import time
    time.sleep(5)  # 模拟 5 秒延迟
    print(f"Email sent to {email}! (This prints in the server console)")

@app.post("/signup")
async def user_signup(email: str, background_tasks: BackgroundTasks):
    # 将耗时的电子邮件任务添加到后台
    background_tasks.add_task(send_welcome_email, email)
    # 立即向用户返回响应
    return {"message": f"Thanks for signing up, {email}! Check your inbox soon."}

发生了什么? 当你向 /signup 发送请求时,FastAPI 会看到 background_tasks.add_task(...)。 它会立即发回 {"message": "Thanks..."} 响应。 你的浏览器很开心且响应迅速。 发送响应后,FastAPI 会在后台运行 send_welcome_email()

这非常适合我们的 AI Web 应用 文章生成器! 我们将使用它来运行整个 AI 写作过程。

3.2 Server-Sent Events (SSE):你的实时进度指示器

工作正在后台进行。 但我们如何告诉用户发生了什么? 这就是 Server-Sent Events (SSE) 的用武之地。 SSE 是一种超简单的方式,让服务器通过单个单向连接向浏览器推送更新。 这就像一个实时新闻滚动条:服务器会在新标题出现时发送它们,而你的浏览器只需监听。

为什么不再次使用 WebSockets? WebSockets(来自第 3 部分)非常适合双向聊天。 但对于仅发送单向进度更新,它们有点像使用对讲机,而你只需要一个寻呼机。 SSE 更简单、更轻巧,并且专为此“服务器到客户端”的推送场景而设计。

以下是 FastAPI 中 SSE 端点有多简单:

import asyncio, json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def progress_generator():
    for i in range(1, 6):
        # 在实际应用中,这将是来自我们 AI 的进度更新
        yield f"data: {json.dumps({'progress': i*20, 'step': f'Step {i} done'})}\n\n"
        await asyncio.sleep(1)  # 等待 1 秒

@app.get("/stream-progress")
async def stream_progress():
    return StreamingResponse(progress_generator(), media_type="text/event-stream")

在浏览器端,JavaScript 同样简单:

<div id="progressStatus">Waiting for progress...</div>
<script>
    const progressStatus = document.getElementById('progressStatus');
    const eventSource = new EventSource("/stream-progress"); // Connect to our stream!
    eventSource.onmessage = (event) => {
        const data = JSON.parse(event.data);
        progressStatus.textContent = `Progress: ${data.progress}% - ${data.step}`;
    };
</script>

当你打开此页面时,eventSource 会连接到我们的端点,并且 progressStatus div 将每秒更新一次。 简单而有效!

4. AI 的待办事项清单:PocketFlow 工作流

那么,我们的 AI 如何实际撰写文章? 这不是一蹴而就的。 我们需要为它提供一个循序渐进的计划,就像食谱一样。 这就是 PocketFlow 帮助我们的地方。 它将“撰写文章”的大任务分解为小的、可管理的节点(或步骤)。

4.1 中央枢纽:我们的共享字典

在深入了解节点之前,让我们看一下我们操作的核心:一个简单的 Python 字典。 我们所有的节点都将读取和写入这个中央共享数据中心。 这是它们相互传递信息的方式。

这是作业开始时的样子:

shared_data = {
    "topic": "The user's article topic",
    "sse_queue": None,  # The "mailbox" for progress messages
    "sections": [],     # Will be filled by the Outline node
    "draft": "",        # Will be filled by the Content node
    "final_article": ""  # The final result!
}

sse_queue 是我们的“邮箱”。 每个节点都会将进度更新放入其中,我们的 FastAPI 服务器将从中读取更新用户。

4.2 步骤 1:GenerateOutline 节点

此节点的唯一工作是创建文章的结构。 (我们假设存在一个 call_llm 函数,它与 LLM 通信)。

class GenerateOutline(Node):
    def prep(self, shared):
        return shared["topic"]
    def exec(self, topic):
        prompt = f"Create 3 section titles for an article on '{topic}'."
        return call_llm(prompt)  # e.g., "Intro,Main Points,Conclusion"
    def post(self, shared, outline_str):
        sections = outline_str.split(',')
        shared["sections"] = sections
        progress = {"step": "outline", "progress": 33, "data": sections}
        shared["sse_queue"].put_nowait(progress)

这里发生了什么:

  • prep 从共享字典中获取用户的主题。
  • exec 接受该主题并要求 LLM 提供一个大纲。
  • post 保存大纲以供下一步使用,并且最重要的是,将进度消息放入邮箱中。 这告诉前端,“嘿,我已经完成 33% 了!”

4.3 步骤 2:WriteContent 节点

此节点采用大纲并为每个部分编写内容。

class WriteContent(Node):
    def prep(self, shared):
        return shared["sections"], shared["sse_queue"]
    def exec(self, prep_result):
        sections, queue = prep_result
        full_draft = ""
        for i, section in enumerate(sections):
            prompt = f"Write a paragraph for the section: '{section}'."
            paragraph = call_llm(prompt)
            full_draft += f"<h2>{section}</h2>\n<p>{paragraph}</p>\n"
            progress = {"step": "writing", "progress": 33 + ((i+1)*20)}
            queue.put_nowait(progress)
        return full_draft
    def post(self, shared, full_draft):
        shared["draft"] = full_draft

它的优点: 在 exec 循环内部,在编写每个段落之后,我们立即发送另一个进度更新。 这意味着用户将在此步骤中看到进度条多次向前跳跃,从而使应用感觉非常响应迅速。

4.4 步骤 3:ApplyStyle 节点

这是最后的润色。 它采用组合的草稿并要求 LLM 对其进行润色。

class ApplyStyle(Node):
    def prep(self, shared):
        return shared["draft"]
    def exec(self, draft):
        prompt = f"Rewrite this draft in an engaging style: {draft[:500]}"
        return call_llm(prompt)
    def post(self, shared, final_article):
        shared["final_article"] = final_article
        progress = {"step": "complete", "progress": 100, "data": final_article}
        shared["sse_queue"].put_nowait(progress)

最后的结局: 此节点进行最后的重写。 至关重要的是,它将 complete 消息与 progress: 100 一起发送到邮箱。 这告诉我们的前端,作业已完成并且最终文章已准备就绪!

4.5 将所有内容与 Flow 连接起来

最后,我们只需要告诉 PocketFlow 我们的待办事项列表的顺序。

from pocketflow import Flow

def create_article_flow():
    outline_node = GenerateOutline()
    content_node = WriteContent()
    style_node = ApplyStyle()
    # Define the sequence: outline -> write -> style
    outline_node >> content_node >> style_node
    return Flow(start_node=outline_node)

这非常易读:outline_node 运行,然后是 content_node,然后是 style_node。 此 Flow 对象是我们的 FastAPI BackgroundTasks 最终将运行的对象。

有了 AI 的“待办事项列表”,让我们将其连接到我们的 FastAPI 后端。

5. 连接各个部分:FastAPI 后端

AI 已经有了 PocketFlow 的“待办事项列表”。 现在,让我们构建一个 Web 服务器,作为项目经理。 它将接收来自用户的请求,将工作交给我们的 AI,并报告进度。

我们将逐步完成主要的 main.py 文件。

5.1 作业中心

首先,我们需要一个地方来跟踪当前正在运行的所有文章撰写作业。 一个简单的 Python 字典非常适合此目的。

# A dictionary to hold our active jobs
# Key: A unique job_id (string)
# Value: The "mailbox" (asyncio.Queue) for that job's messages
active_jobs = {}

active_jobs 视为办公室的前台。 当新作业进来时,我们给它一个工单号 (job_id) 和一个专用的邮箱 (asyncio.Queue),用于所有内部备忘录。

5.2 启动作业

这是用户与之交互的第一件事。 他们将他们的文章主题发送到我们的 /start-job 端点,这会启动整个过程。

@app.post("/start-job")
async def start_job(background_tasks: BackgroundTasks, topic: str = Form(...)):
    job_id = str(uuid.uuid4())
    # Create a new, empty mailbox for this specific job
    sse_queue = asyncio.Queue()
    active_jobs[job_id] = sse_queue
    # Tell FastAPI: "Run this function in the background"
    background_tasks.add_task(run_article_workflow, job_id, topic)
    # IMMEDIATELY send a response back to the user
    return {"job_id": job_id}

分解一下:

  • 获取工单号job_id = str(uuid.uuid4()) 为此请求创建一个唯一 ID。
  • 创建邮箱sse_queue = asyncio.Queue() 创建消息队列。 然后我们使用 job_id 将其存储在我们的 active_jobs 字典中。
  • 移交工作background_tasks.add_task(...) 是魔术。 它告诉 FastAPI,“不要等待! 在你发送响应后,开始运行 run_article_workflow 函数。”
  • 即时回复return {"job_id": job_id} 立即发送回用户的浏览器。 用户的页面不会冻结!

5.3 后台工作者

这是在后台运行的函数。 它是实际运行我们的 PocketFlow 待办事项列表的项目经理。

def run_article_workflow(job_id: str, topic: str):
    sse_queue = active_jobs[job_id]
    shared = {
        "topic": topic,
        "sse_queue": sse_queue,  # Here's where we pass the mailbox in!
        "sections": [],
        "draft": "",
        "final_article": ""
    }
    flow = create_article_flow()
    flow.run(shared)  # Start the PocketFlow!

这是关键的连接:

  • 它从我们的 active_jobs 字典中获取此作业的正确 sse_queue (邮箱)。
  • 它创建共享的数据字典并将 sse_queue 放入其中。

当调用 flow.run(shared) 时,我们的 PocketFlow 节点现在可以访问此队列,并将它们的进度消息放入其中!

5.4 流式传输进度更新

当后台任务运行时,用户的浏览器连接到我们的 /progress/{job_id} 端点以监听更新。

此函数看起来有点复杂,但它只是一个检查邮箱的循环。

@app.get("/progress/{job_id}")
async def get_progress(job_id: str):
    async def event_stream():
        # First, find the right mailbox for this job
        if job_id not in active_jobs:
            yield 'data: {"error": "Job not found"}\n\n'
            return
        sse_queue = active_jobs[job_id]
        while True:
            # Wait for a new message to arrive in the mailbox
            progress_msg = await sse_queue.get()
            yield f"data: {json.dumps(progress_msg)}\n\n"
            # If the message says "complete", we're done!
            if progress_msg.get("step") == "complete":
                del active_jobs[job_id]  # Clean up the job
                break
    return StreamingResponse(event_stream(), media_type="text/event-stream")

逻辑很简单:

  • event_stream 是一个特殊的异步生成器,可以随时间发送 (yield) 数据。
  • 它找到 job_id 的正确邮箱 (sse_queue)。
  • while True: 循环开始。
  • await sse_queue.get() 暂停并等待直到消息出现在邮箱中。
  • 一旦 PocketFlow 节点将消息放入,此行就会醒来,获取消息,并使用 yield 将其发送到浏览器。
  • 它会一直这样做,直到看到 "step": "complete" 消息,此时它会清理并关闭连接。

这就是整个系统! 这是一个干净的循环:用户启动一个作业,后台工作者运行它,同时将消息放入邮箱中,而流媒体从该邮箱中读取以保持用户更新。

6. 任务完成! 你的应用现在可以处理繁重的工作

你成功了! 你已成功构建了一个 Web 应用,可以处理长时间、复杂的 AI 任务,而不会崩溃或让你的用户感到沮丧。 他们会收到即时响应和实时进度更新,从而使整个体验感觉流畅、交互和专业。

再也没有可怕的加载指示器或“页面无响应”错误。 你的 AI Web 应用 现在就像一个现代的智能助手:它会确认你的请求,在后台努力工作,并让你随时了解每一步。

你今天取得的成就:

  • 不再冻结 UI:使用 FastAPI 的 BackgroundTasks 卸载繁重的 AI 工作,因此你的应用保持响应。
  • 实时进度更新:掌握 Server-Sent Events (SSE) 以将状态更新从服务器实时流式传输到浏览器。
  • 干净、有组织的逻辑:使用 PocketFlow 构造复杂的、多步骤的 AI 作业,使你的 AI 逻辑与你的 Web 代码分离。
  • 将所有内容连接起来:使用 asyncio.Queue 作为简单的“邮箱”,让你的 BackgroundTasks 与你的 Web 服务器通信。

这种架构是构建严肃的 AI 应用的改变游戏规则的方法。 你现在拥有创建工具的技能,这些工具可以生成报告、分析数据或执行任何其他耗时的任务,同时让你的用户感到满意和参与。

总而言之,利用 FastAPIBackgroundTasksSSE 技术,我们能够创建一个高效的 AI Web 应用,它不仅能够处理复杂的 LLM 任务,还能提供卓越的用户体验。无论是数据分析、内容生成还是其他 AI 驱动的应用场景,这种架构都将助力开发者构建更加智能、响应迅速且易于管理的解决方案。拥抱这些技术,开启你的 AI 开发新篇章。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注