本文将深入探讨如何利用 Airflow 3.0 及其强大的 Airflow AI SDK 构建一个现实世界的 MLOps 应用,旨在提升 Astronomer 支持团队的生产力。通过 Airflow 的强大调度能力,结合本地部署的 LLM,该应用能够识别与新提交的支持工单相关的历史工单,并提供相关信息,帮助工程师更高效地解决问题。这个案例展示了 Airflow 在 LLM 应用中的巨大潜力,以及如何克服 MLOps 在实际应用中面临的挑战。
Airflow 3.0:MLOps 的变革性发布
Airflow 3.0 的发布对于 MLOps 领域来说是一次变革。它带来了一些关键特性,极大地提升了用户构建和维护机器学习应用的能力。首先,Airflow 3.0 原生支持 backfill,这使得高效加载历史训练数据成为可能。这意味着我们可以轻松地将大量历史工单数据导入到系统中,为后续的 LLM 模型训练和推理提供充足的素材。
其次,Airflow 3.0 不再要求 DAG 运行具有唯一的执行日期,这简化了实验跟踪和超参数调整。在 LLM 模型开发过程中,我们经常需要进行大量的实验,尝试不同的模型参数和训练策略。Airflow 3.0 的这一特性使得我们能够更好地管理这些实验,并追踪每次实验的结果,从而更快地找到最佳的模型配置。
最后,Airflow 3.0 原生支持事件驱动的调度,可以快速触发推理任务。例如,当监控工业设备的传感器超过某个阈值时,可以立即触发一个 DAG 来分析遥测数据,更准确地检测设备故障。在 Astronomer 的支持场景中,当一个新的支持工单被提交时,我们可以利用这一特性立即触发一个 DAG 来识别相关的历史工单,并为工程师提供有用的信息。
这些改进使得 Airflow 3.0 成为构建和部署 MLOps 应用的理想平台。
Airflow AI SDK:简化 LLM 应用开发
在 Airflow 3.0 发布之前,Astronomer 发布了 Airflow AI SDK,旨在简化 MLOps DAG 的编写。Airflow AI SDK 使得构建与 LLM 和 LLM 相关功能(如 RAG 的嵌入)交互的 Airflow DAG 变得更加 Pythonic 和直观。
Airflow AI SDK 最吸引人的特性之一是它允许用户在 Airflow 本地部署模型,从而将所有数据保存在 Airflow 环境中。这意味着我们可以避免将敏感的客户数据发送给 OpenAI 或其他 LLM 提供商,从而确保数据的安全性和隐私性。这对于 Astronomer 来说至关重要,因为他们需要处理大量的客户数据,并且必须遵守严格的数据安全规定。
具体来说,文章作者使用了 Airflow AI SDK 的 @task.llm
和 @task.embed
装饰器来简化 LLM 相关任务的编写。@task.llm
装饰器允许用户轻松地调用 LLM 模型来执行文本摘要等任务,而 @task.embed
装饰器则可以用于生成文本的嵌入向量,以便进行相似性搜索。
Ollama:本地部署 LLM 模型
文章中提到了使用 Ollama 在 Airflow worker 节点上运行本地 LLM 模型。Ollama 允许开发者在本地轻松部署和运行各种开源 LLM 模型,无需依赖外部云服务。这对于数据安全敏感或者需要在离线环境下运行 LLM 应用的场景非常有用。
文章作者使用 Ollama 运行了 llama3.2 模型,并将其用于工单摘要生成。通过在 Airflow DAG 中使用 @task.bash
装饰器,可以方便地启动和管理 Ollama 实例。
from airflow.decorators import task
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
ollama_model = OpenAIModel(
model_name="llama3.2:latest",
provider=OpenAIProvider(
# change this to your ollama host if it's different
base_url="http://localhost:11434/v1"
),
)
@task.bash(queue="ollama-model")
def start_ollama():
"""
Runs llama3.2 on a Celery worker via ollama
"""
return """
if ! lsof -i :11434 >/dev/null 2>&1; then
echo "Starting Ollama..."
nohup ollama serve > ollama.log 2>&1 &
for i in {1..60}; do
if curl -sf http://localhost:11434/v1/models > /dev/null; then
echo "Ollama is ready."
exit 0
fi
sleep 0.5
done
echo "Ollama failed to start in time."
exit 1
else
echo "Ollama already running on port 11434."
fi
"""
向量数据库:高效的相似性搜索
为了识别相关的历史工单,我们需要将工单数据嵌入到向量空间中,并使用向量数据库来存储和索引这些嵌入向量。当一个新的工单被提交时,我们可以将它的嵌入向量与向量数据库中的所有嵌入向量进行比较,找到最相似的几个向量,从而找到相关的历史工单。
文章作者使用 Snowflake 作为向量数据库,因为它已经存储了所有的工单数据。这使得他们能够轻松地将工单数据嵌入到向量空间中,并将其存储在 Snowflake 的一个表中。
@task.embed(map_index_template="{{ ticket_id }}")
def embed_ticket_summaries(ticket_data, summary):
"""
Use Airflow AI SDK to embed the ticket summaries
"""
context = get_current_context()
context['ticket_id'] = ticket_data["TICKET_ID"]
return summary
embedded_ticket_summaries = embed_ticket_summaries.expand(ticket_data=ticket_data, summary=ticket_summaries)
RAG:检索增强生成
文章最初的目标是使用检索增强生成 (RAG) 自动为新的工单提出解决方案。RAG 是一种将检索模型和生成模型结合起来的技术,它可以从一个大型的知识库中检索相关的信息,并将这些信息作为生成模型的输入,从而生成更加准确和相关的答案。
然而,文章作者发现,当前的 RAG 技术还不够成熟,无法为所有的工单提供准确的解决方案。因此,他们暂时放弃了自动生成解决方案的目标,而是专注于提供与新工单相关的历史工单和知识库文章。
@task
def search_embeddings(vector_string):
hook = SnowflakeHook(snowflake_conn_id="snowflake_cre_data")
sql = f"""
SELECT
id AS ticket_id,
summary
FROM SANDBOX.FIELDENGINEER.TICKET_EMBEDDINGS
ORDER BY
VECTOR_COSINE_SIMILARITY(
vector,
ARRAY_CONSTRUCT{tuple(vector_string)}::VECTOR(FLOAT, 384)
) DESC
LIMIT 3;
"""
return hook.get_pandas_df(sql).to_dict(orient="records")
事件驱动调度:实时响应
Airflow 3 引入了资产监视器 (AssetWatcher) 的概念,虽然目前还没有现成的 HTTP 资产监视器,但文章作者利用 Zendesk 的 webhook 功能来触发 Airflow REST API,从而实现事件驱动的调度。当一个新的工单被提交时,Zendesk 会自动触发 Airflow DAG 的运行,并将工单 ID 作为 payload 传递给 DAG。
通过这种方式,我们可以实现对新工单的实时响应,并立即开始搜索相关的历史工单和知识库文章。
未来展望:持续改进和优化
虽然当前的系统还不能自动生成准确的解决方案,但它已经能够为 Astronomer 的支持工程师提供有用的信息,帮助他们更高效地解决问题。未来,文章作者计划继续改进和优化系统,提高 RAG 模型的准确性和可靠性,最终实现自动生成解决方案的目标。
这包括:
- 收集更多的训练数据
- 尝试不同的 LLM 模型
- 优化嵌入向量的生成策略
- 改进相似性搜索算法
通过持续的努力,我们可以将 Airflow 及其 AI SDK 打造成为一个强大的 MLOps 平台,帮助 Astronomer 以及其他企业更好地利用 LLM 技术,提升生产力并创造价值。
总结:Airflow 在 MLOps 中的关键作用
总而言之,这篇文章展示了 Airflow 3.0 和 Airflow AI SDK 如何简化 MLOps 应用的构建和部署,特别是结合 LLM 的应用。通过利用 Airflow 的强大调度能力、Airflow AI SDK 的便捷 API 以及本地部署的 LLM 模型,Astronomer 成功构建了一个能够提升支持团队效率的系统。这个案例强调了 Airflow 在 MLOps 领域的关键作用,并为其他企业提供了一个有价值的参考。尽管 RAG 技术目前还存在一些局限性,但随着技术的不断发展,我们有理由相信,Airflow 将在未来的 MLOps 应用中发挥更大的作用。