你是否曾经历过,满怀期待地让 LLM (大型语言模型) 为你撰写一篇博客文章,却只能眼巴巴地盯着加载指示器,不知是浏览器崩溃了还是 AI 还在苦苦挣扎? 今天,我们将解决这个问题,构建一个能够执行繁重任务(如撰写完整文章)且不会卡死的 AI Web 应用。 你将能够实时看到进度更新,随时了解 AI 的工作状态,让长时间运行的 AI 任务变得快速且互动。
1. 告别“死亡旋转”:长任务为何让应用崩溃
想象一下,你的新型 AI Web 应用 是一位天才内容写作者。 你让它“撰写一篇关于太空探索的文章”,点击回车,然后… 可怕的加载指示器出现了! 整个页面冻结,无法点击任何内容。 一分钟后,浏览器甚至会弹出“此页面无响应”的错误提示。 这就是 Web 应用中长时间运行任务的经典问题。 传统的 Web 应用工作方式如下:
- 你发出请求(例如,“生成我的文章!”)。
- 服务器处理请求(例如,调用 LLM,耗时 2-3 分钟)。
- 你等待… 再等待…(连接保持打开状态,你的浏览器被卡住)。
- 最终,你得到结果(或者超时错误!)。
这种用户体验非常糟糕。 用户需要知道他们的请求正在被处理,并能看到进度。
2. 解决方案:智能餐厅类比
将上述过程想象成在餐厅点餐:
- 糟糕的方式 (标准 Web 请求):你点了一份牛排。 服务员站在你的餐桌旁,一动不动地盯着你看,整整 15 分钟的烹饪时间。 很尴尬,对吧? 你甚至没法要杯水。
- 更好的方式 (我们的方法):你点了一份牛排。 服务员说:“好的,稍等!我这就通知厨师。”然后走开(BackgroundTasks)。 几分钟后,他们给你端来一些面包(“正在准备!”)。 过一会儿,你的饮料也上来了(“快做好了!”)。 在主菜准备好的同时,你感到开心且被告知进度。
这正是我们今天要构建的 AI Web 应用 所要实现的效果。 我们的应用将:
- 立即确认用户的请求。
- 将繁重的 AI 工作卸载到 BackgroundTasks 中。
- 通过 Server-Sent Events (SSE) 将实时进度更新流式传输回用户。
实现此目标的工具包:
- FastAPI BackgroundTasks:用于运行 AI 作业,而不会冻结应用。
- Server-Sent Events (SSE):一种从服务器向浏览器推送实时更新的简单方法。
- PocketFlow:用于组织我们的多步骤文章撰写流程。
3. 工具介绍:FastAPI BackgroundTasks 和 Server-Sent Events (SSE)
为了构建响应迅速的 AI Web 应用 文章编写器,我们需要两项关键技术:一项用于在后台处理工作,另一项用于报告其进度。
3.1 FastAPI BackgroundTasks:后台工作者
将 BackgroundTasks 视为将任务交给独立工作的助手。 你告诉 FastAPI:“嘿,在通知用户我收到他们的请求后,请在后台运行这个函数。” 关键在于,用户会立即收到响应。 他们不必等待后台工作完成。
这就像从亚马逊订购商品。 你会立即收到“订单已确认!”的电子邮件。 实际的拣货、包装和运输过程稍后会在后台进行。
下面是一个简单的示例:在用户注册后发送欢迎电子邮件。
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
# 这是一个耗时的任务(例如,调用电子邮件服务)
def send_welcome_email(email: str):
import time
time.sleep(5) # 模拟 5 秒延迟
print(f"Email sent to {email}! (This prints in the server console)")
@app.post("/signup")
async def user_signup(email: str, background_tasks: BackgroundTasks):
# 将耗时的电子邮件任务添加到后台
background_tasks.add_task(send_welcome_email, email)
# 立即向用户返回响应
return {"message": f"Thanks for signing up, {email}! Check your inbox soon."}
发生了什么? 当你向 /signup
发送请求时,FastAPI 会看到 background_tasks.add_task(...)
。 它会立即发回 {"message": "Thanks..."}
响应。 你的浏览器很开心且响应迅速。 发送响应后,FastAPI 会在后台运行 send_welcome_email()
。
这非常适合我们的 AI Web 应用 文章生成器! 我们将使用它来运行整个 AI 写作过程。
3.2 Server-Sent Events (SSE):你的实时进度指示器
工作正在后台进行。 但我们如何告诉用户发生了什么? 这就是 Server-Sent Events (SSE) 的用武之地。 SSE 是一种超简单的方式,让服务器通过单个单向连接向浏览器推送更新。 这就像一个实时新闻滚动条:服务器会在新标题出现时发送它们,而你的浏览器只需监听。
为什么不再次使用 WebSockets? WebSockets(来自第 3 部分)非常适合双向聊天。 但对于仅发送单向进度更新,它们有点像使用对讲机,而你只需要一个寻呼机。 SSE 更简单、更轻巧,并且专为此“服务器到客户端”的推送场景而设计。
以下是 FastAPI 中 SSE 端点有多简单:
import asyncio, json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def progress_generator():
for i in range(1, 6):
# 在实际应用中,这将是来自我们 AI 的进度更新
yield f"data: {json.dumps({'progress': i*20, 'step': f'Step {i} done'})}\n\n"
await asyncio.sleep(1) # 等待 1 秒
@app.get("/stream-progress")
async def stream_progress():
return StreamingResponse(progress_generator(), media_type="text/event-stream")
在浏览器端,JavaScript 同样简单:
<div id="progressStatus">Waiting for progress...</div>
<script>
const progressStatus = document.getElementById('progressStatus');
const eventSource = new EventSource("/stream-progress"); // Connect to our stream!
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
progressStatus.textContent = `Progress: ${data.progress}% - ${data.step}`;
};
</script>
当你打开此页面时,eventSource
会连接到我们的端点,并且 progressStatus
div 将每秒更新一次。 简单而有效!
4. AI 的待办事项清单:PocketFlow 工作流
那么,我们的 AI 如何实际撰写文章? 这不是一蹴而就的。 我们需要为它提供一个循序渐进的计划,就像食谱一样。 这就是 PocketFlow 帮助我们的地方。 它将“撰写文章”的大任务分解为小的、可管理的节点(或步骤)。
4.1 中央枢纽:我们的共享字典
在深入了解节点之前,让我们看一下我们操作的核心:一个简单的 Python 字典。 我们所有的节点都将读取和写入这个中央共享数据中心。 这是它们相互传递信息的方式。
这是作业开始时的样子:
shared_data = {
"topic": "The user's article topic",
"sse_queue": None, # The "mailbox" for progress messages
"sections": [], # Will be filled by the Outline node
"draft": "", # Will be filled by the Content node
"final_article": "" # The final result!
}
sse_queue
是我们的“邮箱”。 每个节点都会将进度更新放入其中,我们的 FastAPI 服务器将从中读取更新用户。
4.2 步骤 1:GenerateOutline 节点
此节点的唯一工作是创建文章的结构。 (我们假设存在一个 call_llm
函数,它与 LLM 通信)。
class GenerateOutline(Node):
def prep(self, shared):
return shared["topic"]
def exec(self, topic):
prompt = f"Create 3 section titles for an article on '{topic}'."
return call_llm(prompt) # e.g., "Intro,Main Points,Conclusion"
def post(self, shared, outline_str):
sections = outline_str.split(',')
shared["sections"] = sections
progress = {"step": "outline", "progress": 33, "data": sections}
shared["sse_queue"].put_nowait(progress)
这里发生了什么:
prep
从共享字典中获取用户的主题。exec
接受该主题并要求 LLM 提供一个大纲。post
保存大纲以供下一步使用,并且最重要的是,将进度消息放入邮箱中。 这告诉前端,“嘿,我已经完成 33% 了!”
4.3 步骤 2:WriteContent 节点
此节点采用大纲并为每个部分编写内容。
class WriteContent(Node):
def prep(self, shared):
return shared["sections"], shared["sse_queue"]
def exec(self, prep_result):
sections, queue = prep_result
full_draft = ""
for i, section in enumerate(sections):
prompt = f"Write a paragraph for the section: '{section}'."
paragraph = call_llm(prompt)
full_draft += f"<h2>{section}</h2>\n<p>{paragraph}</p>\n"
progress = {"step": "writing", "progress": 33 + ((i+1)*20)}
queue.put_nowait(progress)
return full_draft
def post(self, shared, full_draft):
shared["draft"] = full_draft
它的优点: 在 exec
循环内部,在编写每个段落之后,我们立即发送另一个进度更新。 这意味着用户将在此步骤中看到进度条多次向前跳跃,从而使应用感觉非常响应迅速。
4.4 步骤 3:ApplyStyle 节点
这是最后的润色。 它采用组合的草稿并要求 LLM 对其进行润色。
class ApplyStyle(Node):
def prep(self, shared):
return shared["draft"]
def exec(self, draft):
prompt = f"Rewrite this draft in an engaging style: {draft[:500]}"
return call_llm(prompt)
def post(self, shared, final_article):
shared["final_article"] = final_article
progress = {"step": "complete", "progress": 100, "data": final_article}
shared["sse_queue"].put_nowait(progress)
最后的结局: 此节点进行最后的重写。 至关重要的是,它将 complete
消息与 progress: 100
一起发送到邮箱。 这告诉我们的前端,作业已完成并且最终文章已准备就绪!
4.5 将所有内容与 Flow 连接起来
最后,我们只需要告诉 PocketFlow 我们的待办事项列表的顺序。
from pocketflow import Flow
def create_article_flow():
outline_node = GenerateOutline()
content_node = WriteContent()
style_node = ApplyStyle()
# Define the sequence: outline -> write -> style
outline_node >> content_node >> style_node
return Flow(start_node=outline_node)
这非常易读:outline_node
运行,然后是 content_node
,然后是 style_node
。 此 Flow
对象是我们的 FastAPI BackgroundTasks 最终将运行的对象。
有了 AI 的“待办事项列表”,让我们将其连接到我们的 FastAPI 后端。
5. 连接各个部分:FastAPI 后端
AI 已经有了 PocketFlow 的“待办事项列表”。 现在,让我们构建一个 Web 服务器,作为项目经理。 它将接收来自用户的请求,将工作交给我们的 AI,并报告进度。
我们将逐步完成主要的 main.py
文件。
5.1 作业中心
首先,我们需要一个地方来跟踪当前正在运行的所有文章撰写作业。 一个简单的 Python 字典非常适合此目的。
# A dictionary to hold our active jobs
# Key: A unique job_id (string)
# Value: The "mailbox" (asyncio.Queue) for that job's messages
active_jobs = {}
将 active_jobs
视为办公室的前台。 当新作业进来时,我们给它一个工单号 (job_id
) 和一个专用的邮箱 (asyncio.Queue
),用于所有内部备忘录。
5.2 启动作业
这是用户与之交互的第一件事。 他们将他们的文章主题发送到我们的 /start-job
端点,这会启动整个过程。
@app.post("/start-job")
async def start_job(background_tasks: BackgroundTasks, topic: str = Form(...)):
job_id = str(uuid.uuid4())
# Create a new, empty mailbox for this specific job
sse_queue = asyncio.Queue()
active_jobs[job_id] = sse_queue
# Tell FastAPI: "Run this function in the background"
background_tasks.add_task(run_article_workflow, job_id, topic)
# IMMEDIATELY send a response back to the user
return {"job_id": job_id}
分解一下:
- 获取工单号:
job_id = str(uuid.uuid4())
为此请求创建一个唯一 ID。 - 创建邮箱:
sse_queue = asyncio.Queue()
创建消息队列。 然后我们使用job_id
将其存储在我们的active_jobs
字典中。 - 移交工作:
background_tasks.add_task(...)
是魔术。 它告诉 FastAPI,“不要等待! 在你发送响应后,开始运行run_article_workflow
函数。” - 即时回复:
return {"job_id": job_id}
立即发送回用户的浏览器。 用户的页面不会冻结!
5.3 后台工作者
这是在后台运行的函数。 它是实际运行我们的 PocketFlow 待办事项列表的项目经理。
def run_article_workflow(job_id: str, topic: str):
sse_queue = active_jobs[job_id]
shared = {
"topic": topic,
"sse_queue": sse_queue, # Here's where we pass the mailbox in!
"sections": [],
"draft": "",
"final_article": ""
}
flow = create_article_flow()
flow.run(shared) # Start the PocketFlow!
这是关键的连接:
- 它从我们的
active_jobs
字典中获取此作业的正确sse_queue
(邮箱)。 - 它创建共享的数据字典并将
sse_queue
放入其中。
当调用 flow.run(shared)
时,我们的 PocketFlow 节点现在可以访问此队列,并将它们的进度消息放入其中!
5.4 流式传输进度更新
当后台任务运行时,用户的浏览器连接到我们的 /progress/{job_id}
端点以监听更新。
此函数看起来有点复杂,但它只是一个检查邮箱的循环。
@app.get("/progress/{job_id}")
async def get_progress(job_id: str):
async def event_stream():
# First, find the right mailbox for this job
if job_id not in active_jobs:
yield 'data: {"error": "Job not found"}\n\n'
return
sse_queue = active_jobs[job_id]
while True:
# Wait for a new message to arrive in the mailbox
progress_msg = await sse_queue.get()
yield f"data: {json.dumps(progress_msg)}\n\n"
# If the message says "complete", we're done!
if progress_msg.get("step") == "complete":
del active_jobs[job_id] # Clean up the job
break
return StreamingResponse(event_stream(), media_type="text/event-stream")
逻辑很简单:
event_stream
是一个特殊的异步生成器,可以随时间发送 (yield) 数据。- 它找到
job_id
的正确邮箱 (sse_queue
)。 while True:
循环开始。await sse_queue.get()
暂停并等待直到消息出现在邮箱中。- 一旦 PocketFlow 节点将消息放入,此行就会醒来,获取消息,并使用
yield
将其发送到浏览器。 - 它会一直这样做,直到看到
"step": "complete"
消息,此时它会清理并关闭连接。
这就是整个系统! 这是一个干净的循环:用户启动一个作业,后台工作者运行它,同时将消息放入邮箱中,而流媒体从该邮箱中读取以保持用户更新。
6. 任务完成! 你的应用现在可以处理繁重的工作
你成功了! 你已成功构建了一个 Web 应用,可以处理长时间、复杂的 AI 任务,而不会崩溃或让你的用户感到沮丧。 他们会收到即时响应和实时进度更新,从而使整个体验感觉流畅、交互和专业。
再也没有可怕的加载指示器或“页面无响应”错误。 你的 AI Web 应用 现在就像一个现代的智能助手:它会确认你的请求,在后台努力工作,并让你随时了解每一步。
你今天取得的成就:
- 不再冻结 UI:使用 FastAPI 的 BackgroundTasks 卸载繁重的 AI 工作,因此你的应用保持响应。
- 实时进度更新:掌握 Server-Sent Events (SSE) 以将状态更新从服务器实时流式传输到浏览器。
- 干净、有组织的逻辑:使用 PocketFlow 构造复杂的、多步骤的 AI 作业,使你的 AI 逻辑与你的 Web 代码分离。
- 将所有内容连接起来:使用
asyncio.Queue
作为简单的“邮箱”,让你的 BackgroundTasks 与你的 Web 服务器通信。
这种架构是构建严肃的 AI 应用的改变游戏规则的方法。 你现在拥有创建工具的技能,这些工具可以生成报告、分析数据或执行任何其他耗时的任务,同时让你的用户感到满意和参与。
总而言之,利用 FastAPI、BackgroundTasks 和 SSE 技术,我们能够创建一个高效的 AI Web 应用,它不仅能够处理复杂的 LLM 任务,还能提供卓越的用户体验。无论是数据分析、内容生成还是其他 AI 驱动的应用场景,这种架构都将助力开发者构建更加智能、响应迅速且易于管理的解决方案。拥抱这些技术,开启你的 AI 开发新篇章。