支持团队每天都会收到来自用户的数千条非结构化、未分类的消息。如果没有明确的方法来对这些问题进行分类,就很难确定紧急情况的优先级,将其路由到正确的部门,或识别重复出现的问题。手动分类既耗时又容易出错。但如果我们能使用像 OpenAI 的模型这样的 大模型 (LLM) 来阅读消息线程,理解其潜在的主题,并自动建议一致的类别,会怎么样呢? 本文将探讨如何实现这一目标,主要包括清洗HTML消息,搭建OpenAI客户端,使用Pydantic验证LLM响应,利用OpenAI进行事件消息分类,以及进行类别归一化,最终对事件主题进行分析和可视化。 通过这些步骤,我们可以将生成式AI的力量应用于支持工单的处理流程,极大地提升效率和准确性。

1. 清洗HTML消息:为AI提供干净的数据

支持工单数据往往包含大量的HTML标签,这些标签对于大模型理解文本内容毫无帮助,反而会增加噪声。因此,清洗HTML消息是至关重要的第一步。我们需要从混乱的HTML文档中提取出干净、可读的文本,以便后续的AI处理。

文章中使用BeautifulSoup库来实现HTML清洗。BeautifulSoup是一个强大的Python库,专门用于解析HTML和XML文档。它可以方便地提取HTML标签中的文本内容,并去除不必要的HTML标签。

例如,以下代码展示了如何使用BeautifulSoup清洗HTML消息:

from bs4 import BeautifulSoup
import pandas as pd
import re

def clean_html(text):
    """
    Removes HTML tags and extra whitespace from a message.
    """
    if pd.isna(text):
        return ""
    # Use BeautifulSoup to parse and extract text
    soup = BeautifulSoup(text, 'html.parser')
    clean_text = soup.get_text()

    # Normalize whitespace
    clean_text = re.sub(r'\s+', ' ', clean_text)
    clean_text = clean_text.strip()

    return clean_text

这段代码首先检查输入文本是否为空。如果为空,则直接返回空字符串。否则,它使用BeautifulSoup解析HTML文本,然后使用get_text()方法提取文本内容。最后,它使用正则表达式去除多余的空白字符,并返回清洗后的文本。

清洗HTML消息不仅可以提高大模型的分类准确性,还可以减少计算资源的使用,因为模型需要处理的数据量更少。

2. 搭建OpenAI客户端:连接AI的力量

要使用生成式AI模型,例如OpenAI的模型,来对事件消息进行分类,我们需要连接到OpenAI的API。文章推荐使用python-dotenv库来安全地存储API密钥。

python-dotenv允许我们将API密钥存储在.env文件中,而不是直接硬编码在代码中。这提高了安全性,因为API密钥不会被意外地提交到代码仓库。

以下代码展示了如何使用python-dotenv加载API密钥并初始化OpenAI客户端:

import os
from dotenv import load_dotenv
from openai import OpenAI

# Load environment variables from a .env file
load_dotenv()

# Initialize OpenAI client using your API key
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

这段代码首先使用load_dotenv()函数从.env文件中加载环境变量。然后,它使用os.getenv()函数获取OPENAI_API_KEY环境变量的值,并使用该值初始化OpenAI客户端。

通过搭建OpenAI客户端,我们可以轻松地与OpenAI的大模型进行交互,并利用它们强大的文本处理能力。

3. Pydantic验证LLM响应:确保数据质量

大模型虽然强大,但有时会产生与预期格式略有偏差的输出。为了确保数据质量,我们需要验证大模型的响应。文章介绍了使用Pydantic库来自动验证LLM响应的方法。

Pydantic是一个Python库,用于数据验证和设置管理,它使用Python类型注解。它允许我们使用标准的Python类和类型提示来定义模式(或“模型”),然后自动验证传入的数据是否与预期的结构匹配。

例如,以下代码定义了两个Pydantic模型:

from pydantic import BaseModel, ValidationError
from typing import List, Dict

class CategorizationResult(BaseModel):
    category: str
    explanation: str
    keywords: List[str]

class NormalizationResponse(BaseModel):
    normalized_categories: List[str]
    categories_mapping: Dict[str, str]
    explanations: Dict[str, str]

CategorizationResult模型用于验证大模型分类事件消息的响应,它包含category(类别名称)、explanation(类别解释)和keywords(关键词列表)三个字段。NormalizationResponse模型用于验证类别归一化的响应,它包含normalized_categories(归一化类别列表)、categories_mapping(原始类别到归一化类别的映射)和explanations(归一化类别的解释)三个字段。

使用Pydantic验证大模型的响应可以避免手动解析JSON数据和处理字符串,从而简化代码并提高代码的健壮性。如果响应不符合Pydantic模型的定义,Pydantic会抛出一个ValidationError异常,我们可以捕获该异常并采取适当的措施,例如返回一个默认值。

4. 利用OpenAI进行事件消息分类:智能标签的生成

文章的核心在于如何利用生成式AI,特别是OpenAI的大模型,来对事件消息进行分类。这涉及到构建合适的Prompt,发送给大模型,并解析返回结果。

文章中提供了一个示例Prompt,该Prompt指示大模型分析与特定事件相关的消息,并确定它们所属的主要类别。该Prompt还包含一些重要的规则,例如:

  • 使用一致且规范化的类别名称。
  • 避免创建过于相似的类别。
  • 避免创建单例类别。
  • 避免使用通用术语,例如“事件”。
  • 如果找不到明确的类别,则使用“未分类”。

以下代码展示了如何使用OpenAI客户端和Prompt对事件消息进行分类:

def categorize_messages(messages, incident_id):
    """
    Categorizes a list of messages related to an incident using an LLM.
    The model is prompted to identify the main topic discussed across the messages
    and respond with a normalized category, a short explanation, and relevant keywords.
    Validates the response structure using a Pydantic schema.
    """
    # Construct the prompt for the language model
    prompt = my_prompt

    try:
        response = client.beta.chat.completions.parse(
            model="gpt-4.1",
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are an expert in message classification. Your task is to analyze "
                        "incident-related messages and group them under consistent, normalized categories. "
                        "Respond ONLY with a valid JSON object matching the exact structure requested."
                    )
                },
                {"role": "user", "content": prompt}
            ],
            temperature=0.5,
            max_tokens=500,
            response_format=CategorizationResult,
        )
        result: CategorizationResult = response.choices[0].message.parsed
        return result.model_dump()
    except Exception as e:
        return {
            "category": "Uncategorized",
            "explanation": f"Error: {str(e)}",
            "keywords": []
        }

这段代码首先构建一个Prompt,该Prompt包含要分类的事件消息。然后,它使用OpenAI客户端的chat.completions.parse()方法将Prompt发送给大模型response_format=CategorizationResult参数告诉OpenAI客户端使用CategorizationResult模型来验证大模型的响应。如果响应验证成功,则将其转换为字典并返回。如果发生任何错误,则返回一个包含“未分类”类别的默认值。

使用大模型进行事件消息分类可以大大提高分类的效率和准确性,因为大模型可以理解消息的上下文和意图,而不仅仅是关键词。

5. 类别归一化:提高分析一致性

在对所有消息进行单独分类后,我们经常会遇到非常相似的标签,但实际上它们指的是同一类型的问题。为了提高一致性并减少分析中的噪音,我们需要对相似的类别进行归一化和分组。文章介绍了使用大模型来进行语义聚类的方法。

文章中提供了一个示例Prompt,该Prompt指示大模型分析类别及其解释和关键词,并将语义相似或等效的类别分组。该Prompt还包含一些重要的规则,例如:

  • 将类别总数减少到最多5个。
  • 保持语义清晰度和上下文。

以下代码展示了如何使用OpenAI客户端和Prompt对类别进行归一化:

def normalize_categories(df_results):
    """
    Normalizes and groups similar categories to a maximum of 5
    using both the category name and its explanation/keywords for better context.
    """
    print("\nNormalizing and grouping similar categories...")

    # Build a dictionary with categories and their context
    categories_with_context = {}
    for _, row in df_results.iterrows():
        if row['category'] not in categories_with_context:
            keywords = row['keywords']
            if isinstance(keywords, list):
                keywords = ', '.join(keywords)
            elif not isinstance(keywords, str):
                keywords = str(keywords)
            categories_with_context[row['category']] = {
                'explanation': row['explanation'],
                'keywords': keywords
            }

    # Build the prompt for OpenAI
    prompt = my_second_prompt

    try:
        response = client.beta.chat.completions.parse(
            model="gpt-4.1",
            messages=[
                {
                    "role": "system",
                    "content": "You are an expert in text classification and category normalization. Group semantically similar categories based on their name, explanation, and keywords."
                },
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            temperature=0.5,
            max_tokens=2000,
            response_format=NormalizationResponse
        )
        result: NormalizationResponse = completion.choices[0].message.parsed

        # Apply the mapping to normalize the categories
        df_results['normalized_categories'] = df_results['category'].map(result['categories_mapping'])
        df_results['normalized_explanation'] = df_results['normalized_category'].map(result['explanations'])

        # Fallback: keep original if normalization failed
        df_results['normalized_category'] = df_results['normalized_category'].fillna(df_results['category'])
        df_results['normalized_explanation'] = df_results['normalized_explanation'].fillna(df_results['explanations'])

        print(f"Categories after normalization: {len(validated.normalized_categories)}")
        print("\nCategory mapping:")
        for original, normalized in validated.categories_mapping.items():
            print(f"{original} -> {normalized}")

        return df_results
    except ValidationError as ve:
        print("Validation error from Pydantic:")
        print(ve)
    except Exception as e:
        print(f"Error normalizing categories: {str(e)}")
        print(f"Response that caused the error: {response_text if 'response_text' in locals() else 'Unavailable'}")

    # Fallback in case of any failure
    df_results['normalized_category'] = df_results['category']
    df_results['normalized_explanation'] = df_results['explanation']
    return df_results

这段代码首先构建一个包含类别及其上下文的字典。然后,它使用OpenAI客户端的chat.completions.parse()方法将Prompt发送给大模型response_format=NormalizationResponse参数告诉OpenAI客户端使用NormalizationResponse模型来验证大模型的响应。如果响应验证成功,则将原始类别映射到归一化类别,并返回更新后的DataFrame。如果发生任何错误,则保持原始类别不变。

通过使用大模型进行类别归一化,我们可以提高分析的一致性,并更容易识别常见的问题。

6. 事件主题分析和可视化:洞察力的提升

一旦我们有了使用大模型对各个事件进行分类的函数,我们可以将整个过程包装到一个单一的pipeline中,即 analyze_topics() 函数。该函数读取包含所有传入支持消息的CSV文件,清理HTML内容(如果需要),然后按 incident_id 对消息进行分组。对于每个组,它将清理后的消息发送到大模型,以获得一致的类别、解释和相关关键词。

import matplotlib.pyplot as plt
import seaborn as sns

def analyze_topics(file_path, use_clean_messages=False):
    """
    Analyzes the topics of messages using LLM
    """
    # Read the CSV file
    df = pd.read_csv(file_path)

    # Check if clean messages file exists and should be used
    clean_messages_file = 'clean_messages.csv'
    if use_clean_messages and os.path.exists(clean_messages_file):
        print("Loading clean messages from file...")
        df = pd.read_csv(clean_messages_file)
    else:
        print("Cleaning messages...")
        df['clean_msg'] = df['message_html'].apply(clean_html)
        # Save clean messages for future use
        df.to_csv(clean_messages_file, index=False, encoding='utf-8')

    # Convert messages to strings and ensure they're properly encoded
    df['clean_msg'] = df['clean_msg'].apply(lambda x: str(x) if pd.notnull(x) else "")

    # Group messages by incident
    messages_by_incident = {}
    for incident_id, group in df.groupby('incident_id'):
        messages_by_incident[incident_id] = {
            'clean_msg': group['clean_msg'].tolist()
        }

    partial_results = []
    batch_size = 1000

    for i, (incident_id, data) in enumerate(tqdm(messages_by_incident.items(), desc="Categorizing incidents")):
        try:
            print(f"\nProcessing incident {incident_id}")
            print("Messages to process:")
            for idx, msg in enumerate(data['clean_msg']):
                print(f"Message {idx + 1}: {msg}")

            try:
                category_info = categorize_messages(data['clean_msg'], incident_id)
            except UnicodeEncodeError as e:
                print(f"\nEncoding error in incident {incident_id}")
                print(f"Details: {str(e)}")
                continue
            except Exception as e:
                print(f"Error categorizing incident {incident_id}: {str(e)}")
                continue

            try:
                full_message = ' | '.join(data['clean_msg'])
            except UnicodeEncodeError as e:
                print(f"\nEncoding error joining messages for incident {incident_id}")
                print(f"Details: {str(e)}")
                continue

            # Ensure all values are strings
            result = {
                'incident_id': str(incident_id),
                'category': str(category_info.get('category', 'Uncategorized')),
                'explanation': str(category_info.get('explanation', '')),
                'keywords': ', '.join(str(k) for k in category_info.get('keywords', [])),
                'message': str(full_message)
            }

            partial_results.append(result)

            if (i + 1) % batch_size == 0:
                df_partial = pd.DataFrame(partial_results)
                df_partial.to_csv(f'partial_results_{i+1}.csv', index=False)
                print(f"\nSaved partial results up to incident {i+1}")

        except Exception as e:
            print(f"General error processing incident {incident_id}: {str(e)}")
            continue

    if not partial_results:
        print("No results were generated. Check the input data and error messages above.")
        return pd.DataFrame()

    try:
        df_results = pd.DataFrame(partial_results)

        print(f"Categorized {df_results.incident_id.nunique()} incidents")
        print(f"{df_results.category.nunique()} different unnormalized categories")

        total_incidents = len(df_results)
        category_distribution = (df_results['category'].value_counts() / total_incidents * 100)

        plt.figure(figsize=(12, 8))  # Aumentado el alto para mejor visualización
        sns.barplot(y=category_distribution.index, x=category_distribution.values)
        plt.title('Category Distribution of Incidents')
        plt.xlabel('Percentage of Incidents (%)')
        plt.ylabel('Category')
        plt.tight_layout()
        plt.savefig('category_distribution.png')
        plt.close()

        df_results.to_csv('final_results.csv', index=False)

        print("\nCategorization summary:")
        print(f"Total incidents analyzed: {len(df_results)}")
        print("\nCategory distribution:")
        for category, percentage in category_distribution.items():
            print(f"{category}: {percentage:.2f}%")

        return df_results

    except Exception as e:
        print(f"Error creating final results: {str(e)}")
        # Save partial results even if there was an error
        if partial_results:
            pd.DataFrame(partial_results).to_csv('error_partial_results.csv', index=False)
        return pd.DataFrame()

该函数将部分结果分批存储,优雅地处理编码错误,并生成一个已分类事件的最终数据集。它还使用条形图可视化类别的分布,以便快速发现常见问题。这一步是将数千个支持请求的定性见解转化为可操作情报的关键,帮助团队确定优先级、自动化并持续改进其响应流程。 通过 数据可视化 ,我们可以更加直观地了解事件的分布情况,例如哪些类别的事件最多,哪些类别的事件最少,从而为决策提供支持。

7. 命令行执行:简化操作流程

为了使pipeline可以从命令行执行,文章包含一个 main 块,允许使用简单的脚本调用来处理您的数据。 这使得整个分析流程更加便捷和可重复。

import argparse

if __name__ == "__main__":
    # Configure command-line arguments
    parser = argparse.ArgumentParser(description='Analyze message topics')
    parser.add_argument('--use-clean-messages', action='store_true',
                        help='Use already cleaned messages if available')
    args = parser.parse_args()

    # Replace with the path to your CSV file
    file_path = "incident_data_html.csv"

    try:
        df_results = analyze_topics(file_path, args.use_clean_messages)
        print("\nAnalysis completed. Result files have been generated.")
    except Exception as e:
        print(f"Error processing the file: {str(e)}")

此设置支持一个可选标志,用于使用先前运行中保存的预清理消息。您可以使用简单的CLI选项重新处理新数据或使用清理后的消息重新运行。 例如,可以使用 python analyze_messages.py --use-clean-messages 命令来运行分析。

总而言之,本文详细阐述了如何利用 生成式AI大模型 技术,将复杂的支持工单分类流程转化为一个高效、智能的系统。从数据清洗,到模型调用,再到结果验证和可视化,每一步都经过精心设计,旨在最大限度地提升支持团队的工作效率,并为企业决策提供有力的支持。 随着 大模型 技术的不断发展,我们有理由相信,未来的支持工单处理将会更加自动化、智能化。