本文将深入探讨如何利用 Airflow 3.0 及其强大的 Airflow AI SDK 构建一个现实世界的 MLOps 应用,旨在提升 Astronomer 支持团队的生产力。通过 Airflow 的强大调度能力,结合本地部署的 LLM,该应用能够识别与新提交的支持工单相关的历史工单,并提供相关信息,帮助工程师更高效地解决问题。这个案例展示了 AirflowLLM 应用中的巨大潜力,以及如何克服 MLOps 在实际应用中面临的挑战。

Airflow 3.0:MLOps 的变革性发布

Airflow 3.0 的发布对于 MLOps 领域来说是一次变革。它带来了一些关键特性,极大地提升了用户构建和维护机器学习应用的能力。首先,Airflow 3.0 原生支持 backfill,这使得高效加载历史训练数据成为可能。这意味着我们可以轻松地将大量历史工单数据导入到系统中,为后续的 LLM 模型训练和推理提供充足的素材。

其次,Airflow 3.0 不再要求 DAG 运行具有唯一的执行日期,这简化了实验跟踪和超参数调整。在 LLM 模型开发过程中,我们经常需要进行大量的实验,尝试不同的模型参数和训练策略。Airflow 3.0 的这一特性使得我们能够更好地管理这些实验,并追踪每次实验的结果,从而更快地找到最佳的模型配置。

最后,Airflow 3.0 原生支持事件驱动的调度,可以快速触发推理任务。例如,当监控工业设备的传感器超过某个阈值时,可以立即触发一个 DAG 来分析遥测数据,更准确地检测设备故障。在 Astronomer 的支持场景中,当一个新的支持工单被提交时,我们可以利用这一特性立即触发一个 DAG 来识别相关的历史工单,并为工程师提供有用的信息。

这些改进使得 Airflow 3.0 成为构建和部署 MLOps 应用的理想平台。

Airflow AI SDK:简化 LLM 应用开发

Airflow 3.0 发布之前,Astronomer 发布了 Airflow AI SDK,旨在简化 MLOps DAG 的编写。Airflow AI SDK 使得构建与 LLMLLM 相关功能(如 RAG 的嵌入)交互的 Airflow DAG 变得更加 Pythonic 和直观。

Airflow AI SDK 最吸引人的特性之一是它允许用户在 Airflow 本地部署模型,从而将所有数据保存在 Airflow 环境中。这意味着我们可以避免将敏感的客户数据发送给 OpenAI 或其他 LLM 提供商,从而确保数据的安全性和隐私性。这对于 Astronomer 来说至关重要,因为他们需要处理大量的客户数据,并且必须遵守严格的数据安全规定。

具体来说,文章作者使用了 Airflow AI SDK@task.llm@task.embed 装饰器来简化 LLM 相关任务的编写。@task.llm 装饰器允许用户轻松地调用 LLM 模型来执行文本摘要等任务,而 @task.embed 装饰器则可以用于生成文本的嵌入向量,以便进行相似性搜索。

Ollama:本地部署 LLM 模型

文章中提到了使用 Ollama 在 Airflow worker 节点上运行本地 LLM 模型。Ollama 允许开发者在本地轻松部署和运行各种开源 LLM 模型,无需依赖外部云服务。这对于数据安全敏感或者需要在离线环境下运行 LLM 应用的场景非常有用。

文章作者使用 Ollama 运行了 llama3.2 模型,并将其用于工单摘要生成。通过在 Airflow DAG 中使用 @task.bash 装饰器,可以方便地启动和管理 Ollama 实例。

from airflow.decorators import task
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider

ollama_model = OpenAIModel(
    model_name="llama3.2:latest",
    provider=OpenAIProvider(
        # change this to your ollama host if it's different
        base_url="http://localhost:11434/v1"
    ),
)

@task.bash(queue="ollama-model")
def start_ollama():
    """
    Runs llama3.2 on a Celery worker via ollama
    """
    return """
    if ! lsof -i :11434 >/dev/null 2>&1; then
        echo "Starting Ollama..."
        nohup ollama serve > ollama.log 2>&1 &
        for i in {1..60}; do
            if curl -sf http://localhost:11434/v1/models > /dev/null; then
                echo "Ollama is ready."
                exit 0
            fi
            sleep 0.5
        done
        echo "Ollama failed to start in time."
        exit 1
    else
        echo "Ollama already running on port 11434."
    fi
    """

向量数据库:高效的相似性搜索

为了识别相关的历史工单,我们需要将工单数据嵌入到向量空间中,并使用向量数据库来存储和索引这些嵌入向量。当一个新的工单被提交时,我们可以将它的嵌入向量与向量数据库中的所有嵌入向量进行比较,找到最相似的几个向量,从而找到相关的历史工单。

文章作者使用 Snowflake 作为向量数据库,因为它已经存储了所有的工单数据。这使得他们能够轻松地将工单数据嵌入到向量空间中,并将其存储在 Snowflake 的一个表中。

@task.embed(map_index_template="{{ ticket_id }}")
def embed_ticket_summaries(ticket_data, summary):
    """
    Use Airflow AI SDK to embed the ticket summaries
    """
    context = get_current_context()
    context['ticket_id'] = ticket_data["TICKET_ID"]
    return summary

embedded_ticket_summaries = embed_ticket_summaries.expand(ticket_data=ticket_data, summary=ticket_summaries)

RAG:检索增强生成

文章最初的目标是使用检索增强生成 (RAG) 自动为新的工单提出解决方案。RAG 是一种将检索模型和生成模型结合起来的技术,它可以从一个大型的知识库中检索相关的信息,并将这些信息作为生成模型的输入,从而生成更加准确和相关的答案。

然而,文章作者发现,当前的 RAG 技术还不够成熟,无法为所有的工单提供准确的解决方案。因此,他们暂时放弃了自动生成解决方案的目标,而是专注于提供与新工单相关的历史工单和知识库文章。

@task
def search_embeddings(vector_string):
    hook = SnowflakeHook(snowflake_conn_id="snowflake_cre_data")
    sql = f"""
            SELECT
            id AS ticket_id,
            summary
            FROM SANDBOX.FIELDENGINEER.TICKET_EMBEDDINGS
            ORDER BY
            VECTOR_COSINE_SIMILARITY(
                vector,
                ARRAY_CONSTRUCT{tuple(vector_string)}::VECTOR(FLOAT, 384)
            ) DESC
            LIMIT 3;
            """
    return hook.get_pandas_df(sql).to_dict(orient="records")

事件驱动调度:实时响应

Airflow 3 引入了资产监视器 (AssetWatcher) 的概念,虽然目前还没有现成的 HTTP 资产监视器,但文章作者利用 Zendesk 的 webhook 功能来触发 Airflow REST API,从而实现事件驱动的调度。当一个新的工单被提交时,Zendesk 会自动触发 Airflow DAG 的运行,并将工单 ID 作为 payload 传递给 DAG。

通过这种方式,我们可以实现对新工单的实时响应,并立即开始搜索相关的历史工单和知识库文章。

未来展望:持续改进和优化

虽然当前的系统还不能自动生成准确的解决方案,但它已经能够为 Astronomer 的支持工程师提供有用的信息,帮助他们更高效地解决问题。未来,文章作者计划继续改进和优化系统,提高 RAG 模型的准确性和可靠性,最终实现自动生成解决方案的目标。

这包括:

  • 收集更多的训练数据
  • 尝试不同的 LLM 模型
  • 优化嵌入向量的生成策略
  • 改进相似性搜索算法

通过持续的努力,我们可以将 Airflow 及其 AI SDK 打造成为一个强大的 MLOps 平台,帮助 Astronomer 以及其他企业更好地利用 LLM 技术,提升生产力并创造价值。

总结:Airflow 在 MLOps 中的关键作用

总而言之,这篇文章展示了 Airflow 3.0Airflow AI SDK 如何简化 MLOps 应用的构建和部署,特别是结合 LLM 的应用。通过利用 Airflow 的强大调度能力、Airflow AI SDK 的便捷 API 以及本地部署的 LLM 模型,Astronomer 成功构建了一个能够提升支持团队效率的系统。这个案例强调了 AirflowMLOps 领域的关键作用,并为其他企业提供了一个有价值的参考。尽管 RAG 技术目前还存在一些局限性,但随着技术的不断发展,我们有理由相信,Airflow 将在未来的 MLOps 应用中发挥更大的作用。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注