大模型技术日新月异的今天,如何让其更准确、更及时地响应用户的查询需求,并结合外部知识提供更符合语境的答案,成为了一个重要的研究方向。检索增强生成 (Retrieval Augmented Generation, RAG) 管道正在革新大模型与外部知识交互的方式。本文将深入探讨如何在离线环境(air-gapped environment)下,利用ChromaDB向量数据库构建安全的RAG管道,实现高效的Text-to-SQL生成。

知识库准备:从SQLite到向量嵌入

构建有效的RAG管道,第一步是对知识库进行精心的准备和嵌入。这一步涉及将原始数据转换为大模型能够理解和高效搜索的数值格式(向量)。例如,一个包含电影、演员和租赁信息的SQLite数据库,其原始表格结构对大模型而言是难以直接理解的。通过将数据库模式和文本内容转换为向量嵌入,我们可以让大模型能够基于语义相似性快速检索和比较信息。

具体而言,文章中提到的做法是将SQLite数据库中的表名、列名、主键外键关系等信息,通过get_db_schema_for_embedding函数提取出来,并将这些信息转换成易于理解的字符串格式。例如,对于actor表,会生成如下的描述字符串:

Table: actor
Columns: actor_id INTEGER PRIMARY KEY, first_name TEXT NOT NULL, last_name TEXT NOT NULL, last_update TEXT NOT NULL

这些结构化的模式描述将被输入到嵌入模型中,使RAG管道在回答问题时能够理解数据库的结构。

ChromaDB:构建本地向量数据库

向量数据库是存储和高效查询向量嵌入的关键组件。ChromaDB因其易用性和能够构建本地持久化数据库的特性,成为了本文案例的首选。通过chromadb.PersistentClient初始化ChromaDB客户端,可以将向量数据存储在指定的本地目录中,即使脚本运行结束后,数据也不会丢失,这对于离线环境至关重要。

文章中,作者通过以下代码初始化ChromaDB客户端,并创建或加载一个名为”sqlite_texts”的collection:

client = chromadb.PersistentClient(path=embedding_dir_path)
collection = client.get_or_create_collection(
    name=CHROMA_COLLECTION_NAME,
    embedding_function=embedding_function,
    metadata={
        "description": f"Embedded texts and schemas from {db_name} SQLite DB",
        "created": str(datetime.now())
    }
)

其中,embedding_function是用于将文本转换成向量嵌入的SentenceTransformerEmbeddingFunction。这个函数确保了后续添加到collection中的文本,以及查询文本,都使用相同的嵌入模型,从而保证语义搜索的准确性。

语义检索:理解用户意图

语义检索RAG管道的核心环节,它负责根据用户的提问,从向量数据库中找到最相关的上下文信息。这个过程依赖于将用户的提问也转换成向量嵌入,然后与数据库中的向量嵌入进行比较。即使提问中没有包含数据库中的确切关键词,只要语义相似,就能够被检索出来。

例如,用户提问“Show me all films in the ‘Action’ category”,系统会将这个提问转换成向量,然后在ChromaDB中找到与”Action”类别相关的电影。关键在于,即使数据库中没有直接包含“Show me all films”这样的句子,由于向量空间中的相似性,系统仍然能够找到正确的类别和电影信息。

SQL生成:利用本地大模型

离线环境下,使用本地大模型进行SQL生成是关键。文章中使用了transformers库加载一个本地的、针对SQL任务进行微调过的大模型AutoTokenizer.from_pretrainedAutoModelForCausalLM.from_pretrained函数加载了模型和分词器,并设置local_files_only=True以确保完全从本地加载,无需联网。

作者创建了一个pipeline对象,用于简化与大模型的交互。其中,do_sample=False参数设置保证了SQL生成结果的确定性,对于需要准确性的SQL生成任务至关重要。Few-shot learning(少样本学习)也被应用,通过在prompt中包含几个示例,可以显著提高模型生成SQL的准确率。例如:

### Example 1:
### Database Schema:
CREATE TABLE actor (  actor_id INTEGER PRIMARY KEY,  first_name TEXT NOT NULL,  last_name TEXT NOT NULL,  last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
CREATE TABLE film (  film_id INTEGER PRIMARY KEY,  title TEXT NOT NULL,  description TEXT,  release_year INTEGER,  rental_duration INTEGER NOT NULL,  rental_rate NUMERIC(4,2) NOT NULL,  length INTEGER,  replacement_cost NUMERIC(5,2) NOT NULL,  rating TEXT DEFAULT 'G',  last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,  special_features TEXT,  fulltext TEXT);
CREATE TABLE film_actor (  actor_id INTEGER NOT NULL,  film_id INTEGER NOT NULL,  last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,  PRIMARY KEY (actor_id, film_id),  FOREIGN KEY (actor_id) REFERENCES actor(actor_id),  FOREIGN KEY (film_id) REFERENCES film(film_id));
### Context (Data Examples):
-- Example data for actor.first_name: PENELOPE
-- Example data for actor.last_name: GUINESS
-- Example data for film.title: ACADEMY DINOSAUR
### User Request:
How many films did the actor 'PENELOPE GUINESS' appear in?
### SQL Query:
SELECT  COUNT(T1.film_id)
FROM film_actor AS T1
INNER JOIN actor AS T2
ON T1.actor_id = T2.actor_id
WHERE  T2.first_name = 'PENELOPE' AND T2.last_name = 'GUINESS';

这个示例向模型展示了如何根据数据库模式、上下文和用户请求生成SQL查询,有助于提高模型在类似场景下的表现。

SQL执行与结果呈现:保证安全与准确性

在生成SQL查询之后,下一步是执行查询并返回结果。为了保证安全性,文章中加入了SQL验证步骤,确保只执行SELECT查询,避免执行可能修改或删除数据的语句。execute_sql_query函数连接到SQLite数据库,执行SQL查询,并返回查询结果和列名。

def execute_sql_query(sql_query: str, db_path: str = SQLITE_FILE) -> Tuple[List[Tuple], List[str]]:
    """
    Executes a SQL query against the SQLite database and returns results and column names.
    Raises ValueError if the query is not a SELECT statement.
    """
    conn = None
    results = []
    column_names = []
    try:
        # Basic validation: ensure it's a SELECT query
        if not sql_query.strip().upper().startswith("SELECT"):
            raise ValueError("Only SELECT queries are allowed for execution by this system.")

        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        cursor.execute(sql_query)

        if cursor.description: # Check if there are columns (for SELECT queries)
            column_names = [description[0] for description in cursor.description]
            results = cursor.fetchall()

        return results, column_names
    except sqlite3.Error as e:
        print(f"Database error executing SQL: {e}")
        raise # Re-raise to be caught by the main loop for error display
    except ValueError as e:
        print(f"SQL Validation Error: {e}")
        raise
    finally:
        if conn:
            conn.close()

函数中加入了try-except-finally块,用于处理数据库错误和确保数据库连接始终关闭,提高了程序的稳定性和健壮性。查询结果以表格形式呈现给用户,方便用户理解。

Air-Gapped环境下的CLI:用户友好的离线体验

为了在离线环境下提供用户友好的交互体验,文章中实现了一个简单的命令行界面 (CLI)。用户可以通过CLI输入自然语言问题,系统会自动生成SQL查询,执行查询,并将结果以表格形式显示给用户。

if __name__ == "__main__":
    print("\n🛡️ Air-Gapped SQL Chatbot Initialized (Offline Mode)")
    print("--------------------------------------------------")
    print("Type your natural language question to get a SQL query and its results.")
    print("Type 'exit' to quit.")
    print("--------------------------------------------------")

    while True:
        user_question = input("\n🔍 Enter your question: ")
        if user_question.lower() == 'exit':
            break

        print("\nWorking on your request...")
        try:
            generated_sql = generate_sql_from_question(user_question)

            if generated_sql == "NO_QUERY_POSSIBLE":
                print("\n❌ Could not generate a meaningful SQL query for your request. Please try rephrasing.")
                continue

            print("\n✅ Generated SQL Query:\n", generated_sql)

            print("\nExecuting SQL query...")
            query_results, col_names = execute_sql_query(generated_sql)

            if col_names:
                print("\n📊 Query Results:")
                # Print header
                print(" | ".join(col_names))
                print("-" * (sum(len(c) for c in col_names) + (len(col_names) - 1) * 3))
                # Print rows
                for row in query_results:
                    print(" | ".join(map(str, row)))
                print(f"\nTotal rows returned: {len(query_results)}")
            else:
                print("\nℹ️ Query executed successfully, but returned no results.")
        except Exception as e:
            print(f"❌ An error occurred during SQL generation or execution: {e}")
            print("Please try rephrasing your question or check the console for details.")

这个CLI程序实现了完整的RAG管道流程,从用户提问到SQL生成和执行,再到结果呈现,都可以在离线环境下完成。

安全性考量:离线环境下的数据保护

离线环境下,安全性变得尤为重要。由于无法依赖外部网络的安全措施,必须采取额外的措施来保护数据和系统。

首先,需要确保大模型和ChromaDB客户端等组件都是从可信的来源获取,并进行严格的验证,防止恶意软件或漏洞。其次,需要对数据库进行加密,防止未经授权的访问。第三,需要对用户的输入进行过滤,防止SQL注入等攻击。第四,要定期备份数据,防止数据丢失。

文章中已经做了一部分安全性措施,比如只允许执行SELECT查询,避免执行可能修改或删除数据的语句。

结论:大模型驱动的离线数据查询解决方案

本文深入探讨了如何在离线环境下,利用ChromaDB向量数据库构建安全的RAG管道,实现高效的Text-to-SQL生成。通过将数据库模式和文本内容转换为向量嵌入,利用本地大模型进行SQL生成,并执行安全验证,最终实现了用户友好的离线数据查询解决方案。虽然这只是一个基础的RAG管道,但它为在离线环境下利用大模型技术提供了一个有价值的参考。未来,可以通过不断地优化嵌入模型、SQL生成模型和prompt工程,进一步提高系统的准确性和鲁棒性,使其更好地服务于各种离线环境下的数据查询需求。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注