支持团队每天都会收到来自用户的数千条非结构化、未分类的消息。如果没有明确的方法来对这些问题进行分类,就很难确定紧急情况的优先级,将其路由到正确的部门,或识别重复出现的问题。手动分类既耗时又容易出错。但如果我们能使用像 OpenAI 的模型这样的 大模型 (LLM) 来阅读消息线程,理解其潜在的主题,并自动建议一致的类别,会怎么样呢? 本文将探讨如何实现这一目标,主要包括清洗HTML消息,搭建OpenAI客户端,使用Pydantic验证LLM响应,利用OpenAI进行事件消息分类,以及进行类别归一化,最终对事件主题进行分析和可视化。 通过这些步骤,我们可以将生成式AI的力量应用于支持工单的处理流程,极大地提升效率和准确性。
1. 清洗HTML消息:为AI提供干净的数据
支持工单数据往往包含大量的HTML标签,这些标签对于大模型理解文本内容毫无帮助,反而会增加噪声。因此,清洗HTML消息是至关重要的第一步。我们需要从混乱的HTML文档中提取出干净、可读的文本,以便后续的AI处理。
文章中使用BeautifulSoup库来实现HTML清洗。BeautifulSoup是一个强大的Python库,专门用于解析HTML和XML文档。它可以方便地提取HTML标签中的文本内容,并去除不必要的HTML标签。
例如,以下代码展示了如何使用BeautifulSoup清洗HTML消息:
from bs4 import BeautifulSoup
import pandas as pd
import re
def clean_html(text):
"""
Removes HTML tags and extra whitespace from a message.
"""
if pd.isna(text):
return ""
# Use BeautifulSoup to parse and extract text
soup = BeautifulSoup(text, 'html.parser')
clean_text = soup.get_text()
# Normalize whitespace
clean_text = re.sub(r'\s+', ' ', clean_text)
clean_text = clean_text.strip()
return clean_text
这段代码首先检查输入文本是否为空。如果为空,则直接返回空字符串。否则,它使用BeautifulSoup
解析HTML文本,然后使用get_text()
方法提取文本内容。最后,它使用正则表达式去除多余的空白字符,并返回清洗后的文本。
清洗HTML消息不仅可以提高大模型的分类准确性,还可以减少计算资源的使用,因为模型需要处理的数据量更少。
2. 搭建OpenAI客户端:连接AI的力量
要使用生成式AI模型,例如OpenAI的模型,来对事件消息进行分类,我们需要连接到OpenAI的API。文章推荐使用python-dotenv
库来安全地存储API密钥。
python-dotenv
允许我们将API密钥存储在.env
文件中,而不是直接硬编码在代码中。这提高了安全性,因为API密钥不会被意外地提交到代码仓库。
以下代码展示了如何使用python-dotenv
加载API密钥并初始化OpenAI客户端:
import os
from dotenv import load_dotenv
from openai import OpenAI
# Load environment variables from a .env file
load_dotenv()
# Initialize OpenAI client using your API key
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
这段代码首先使用load_dotenv()
函数从.env
文件中加载环境变量。然后,它使用os.getenv()
函数获取OPENAI_API_KEY
环境变量的值,并使用该值初始化OpenAI客户端。
通过搭建OpenAI客户端,我们可以轻松地与OpenAI的大模型进行交互,并利用它们强大的文本处理能力。
3. Pydantic验证LLM响应:确保数据质量
大模型虽然强大,但有时会产生与预期格式略有偏差的输出。为了确保数据质量,我们需要验证大模型的响应。文章介绍了使用Pydantic库来自动验证LLM响应的方法。
Pydantic是一个Python库,用于数据验证和设置管理,它使用Python类型注解。它允许我们使用标准的Python类和类型提示来定义模式(或“模型”),然后自动验证传入的数据是否与预期的结构匹配。
例如,以下代码定义了两个Pydantic模型:
from pydantic import BaseModel, ValidationError
from typing import List, Dict
class CategorizationResult(BaseModel):
category: str
explanation: str
keywords: List[str]
class NormalizationResponse(BaseModel):
normalized_categories: List[str]
categories_mapping: Dict[str, str]
explanations: Dict[str, str]
CategorizationResult
模型用于验证大模型分类事件消息的响应,它包含category
(类别名称)、explanation
(类别解释)和keywords
(关键词列表)三个字段。NormalizationResponse
模型用于验证类别归一化的响应,它包含normalized_categories
(归一化类别列表)、categories_mapping
(原始类别到归一化类别的映射)和explanations
(归一化类别的解释)三个字段。
使用Pydantic验证大模型的响应可以避免手动解析JSON数据和处理字符串,从而简化代码并提高代码的健壮性。如果响应不符合Pydantic模型的定义,Pydantic会抛出一个ValidationError
异常,我们可以捕获该异常并采取适当的措施,例如返回一个默认值。
4. 利用OpenAI进行事件消息分类:智能标签的生成
文章的核心在于如何利用生成式AI,特别是OpenAI的大模型,来对事件消息进行分类。这涉及到构建合适的Prompt,发送给大模型,并解析返回结果。
文章中提供了一个示例Prompt,该Prompt指示大模型分析与特定事件相关的消息,并确定它们所属的主要类别。该Prompt还包含一些重要的规则,例如:
- 使用一致且规范化的类别名称。
- 避免创建过于相似的类别。
- 避免创建单例类别。
- 避免使用通用术语,例如“事件”。
- 如果找不到明确的类别,则使用“未分类”。
以下代码展示了如何使用OpenAI客户端和Prompt对事件消息进行分类:
def categorize_messages(messages, incident_id):
"""
Categorizes a list of messages related to an incident using an LLM.
The model is prompted to identify the main topic discussed across the messages
and respond with a normalized category, a short explanation, and relevant keywords.
Validates the response structure using a Pydantic schema.
"""
# Construct the prompt for the language model
prompt = my_prompt
try:
response = client.beta.chat.completions.parse(
model="gpt-4.1",
messages=[
{
"role": "system",
"content": (
"You are an expert in message classification. Your task is to analyze "
"incident-related messages and group them under consistent, normalized categories. "
"Respond ONLY with a valid JSON object matching the exact structure requested."
)
},
{"role": "user", "content": prompt}
],
temperature=0.5,
max_tokens=500,
response_format=CategorizationResult,
)
result: CategorizationResult = response.choices[0].message.parsed
return result.model_dump()
except Exception as e:
return {
"category": "Uncategorized",
"explanation": f"Error: {str(e)}",
"keywords": []
}
这段代码首先构建一个Prompt,该Prompt包含要分类的事件消息。然后,它使用OpenAI客户端的chat.completions.parse()
方法将Prompt发送给大模型。response_format=CategorizationResult
参数告诉OpenAI客户端使用CategorizationResult
模型来验证大模型的响应。如果响应验证成功,则将其转换为字典并返回。如果发生任何错误,则返回一个包含“未分类”类别的默认值。
使用大模型进行事件消息分类可以大大提高分类的效率和准确性,因为大模型可以理解消息的上下文和意图,而不仅仅是关键词。
5. 类别归一化:提高分析一致性
在对所有消息进行单独分类后,我们经常会遇到非常相似的标签,但实际上它们指的是同一类型的问题。为了提高一致性并减少分析中的噪音,我们需要对相似的类别进行归一化和分组。文章介绍了使用大模型来进行语义聚类的方法。
文章中提供了一个示例Prompt,该Prompt指示大模型分析类别及其解释和关键词,并将语义相似或等效的类别分组。该Prompt还包含一些重要的规则,例如:
- 将类别总数减少到最多5个。
- 保持语义清晰度和上下文。
以下代码展示了如何使用OpenAI客户端和Prompt对类别进行归一化:
def normalize_categories(df_results):
"""
Normalizes and groups similar categories to a maximum of 5
using both the category name and its explanation/keywords for better context.
"""
print("\nNormalizing and grouping similar categories...")
# Build a dictionary with categories and their context
categories_with_context = {}
for _, row in df_results.iterrows():
if row['category'] not in categories_with_context:
keywords = row['keywords']
if isinstance(keywords, list):
keywords = ', '.join(keywords)
elif not isinstance(keywords, str):
keywords = str(keywords)
categories_with_context[row['category']] = {
'explanation': row['explanation'],
'keywords': keywords
}
# Build the prompt for OpenAI
prompt = my_second_prompt
try:
response = client.beta.chat.completions.parse(
model="gpt-4.1",
messages=[
{
"role": "system",
"content": "You are an expert in text classification and category normalization. Group semantically similar categories based on their name, explanation, and keywords."
},
{
"role": "user",
"content": prompt
}
],
temperature=0.5,
max_tokens=2000,
response_format=NormalizationResponse
)
result: NormalizationResponse = completion.choices[0].message.parsed
# Apply the mapping to normalize the categories
df_results['normalized_categories'] = df_results['category'].map(result['categories_mapping'])
df_results['normalized_explanation'] = df_results['normalized_category'].map(result['explanations'])
# Fallback: keep original if normalization failed
df_results['normalized_category'] = df_results['normalized_category'].fillna(df_results['category'])
df_results['normalized_explanation'] = df_results['normalized_explanation'].fillna(df_results['explanations'])
print(f"Categories after normalization: {len(validated.normalized_categories)}")
print("\nCategory mapping:")
for original, normalized in validated.categories_mapping.items():
print(f"{original} -> {normalized}")
return df_results
except ValidationError as ve:
print("Validation error from Pydantic:")
print(ve)
except Exception as e:
print(f"Error normalizing categories: {str(e)}")
print(f"Response that caused the error: {response_text if 'response_text' in locals() else 'Unavailable'}")
# Fallback in case of any failure
df_results['normalized_category'] = df_results['category']
df_results['normalized_explanation'] = df_results['explanation']
return df_results
这段代码首先构建一个包含类别及其上下文的字典。然后,它使用OpenAI客户端的chat.completions.parse()
方法将Prompt发送给大模型。response_format=NormalizationResponse
参数告诉OpenAI客户端使用NormalizationResponse
模型来验证大模型的响应。如果响应验证成功,则将原始类别映射到归一化类别,并返回更新后的DataFrame。如果发生任何错误,则保持原始类别不变。
通过使用大模型进行类别归一化,我们可以提高分析的一致性,并更容易识别常见的问题。
6. 事件主题分析和可视化:洞察力的提升
一旦我们有了使用大模型对各个事件进行分类的函数,我们可以将整个过程包装到一个单一的pipeline中,即 analyze_topics()
函数。该函数读取包含所有传入支持消息的CSV文件,清理HTML内容(如果需要),然后按 incident_id 对消息进行分组。对于每个组,它将清理后的消息发送到大模型,以获得一致的类别、解释和相关关键词。
import matplotlib.pyplot as plt
import seaborn as sns
def analyze_topics(file_path, use_clean_messages=False):
"""
Analyzes the topics of messages using LLM
"""
# Read the CSV file
df = pd.read_csv(file_path)
# Check if clean messages file exists and should be used
clean_messages_file = 'clean_messages.csv'
if use_clean_messages and os.path.exists(clean_messages_file):
print("Loading clean messages from file...")
df = pd.read_csv(clean_messages_file)
else:
print("Cleaning messages...")
df['clean_msg'] = df['message_html'].apply(clean_html)
# Save clean messages for future use
df.to_csv(clean_messages_file, index=False, encoding='utf-8')
# Convert messages to strings and ensure they're properly encoded
df['clean_msg'] = df['clean_msg'].apply(lambda x: str(x) if pd.notnull(x) else "")
# Group messages by incident
messages_by_incident = {}
for incident_id, group in df.groupby('incident_id'):
messages_by_incident[incident_id] = {
'clean_msg': group['clean_msg'].tolist()
}
partial_results = []
batch_size = 1000
for i, (incident_id, data) in enumerate(tqdm(messages_by_incident.items(), desc="Categorizing incidents")):
try:
print(f"\nProcessing incident {incident_id}")
print("Messages to process:")
for idx, msg in enumerate(data['clean_msg']):
print(f"Message {idx + 1}: {msg}")
try:
category_info = categorize_messages(data['clean_msg'], incident_id)
except UnicodeEncodeError as e:
print(f"\nEncoding error in incident {incident_id}")
print(f"Details: {str(e)}")
continue
except Exception as e:
print(f"Error categorizing incident {incident_id}: {str(e)}")
continue
try:
full_message = ' | '.join(data['clean_msg'])
except UnicodeEncodeError as e:
print(f"\nEncoding error joining messages for incident {incident_id}")
print(f"Details: {str(e)}")
continue
# Ensure all values are strings
result = {
'incident_id': str(incident_id),
'category': str(category_info.get('category', 'Uncategorized')),
'explanation': str(category_info.get('explanation', '')),
'keywords': ', '.join(str(k) for k in category_info.get('keywords', [])),
'message': str(full_message)
}
partial_results.append(result)
if (i + 1) % batch_size == 0:
df_partial = pd.DataFrame(partial_results)
df_partial.to_csv(f'partial_results_{i+1}.csv', index=False)
print(f"\nSaved partial results up to incident {i+1}")
except Exception as e:
print(f"General error processing incident {incident_id}: {str(e)}")
continue
if not partial_results:
print("No results were generated. Check the input data and error messages above.")
return pd.DataFrame()
try:
df_results = pd.DataFrame(partial_results)
print(f"Categorized {df_results.incident_id.nunique()} incidents")
print(f"{df_results.category.nunique()} different unnormalized categories")
total_incidents = len(df_results)
category_distribution = (df_results['category'].value_counts() / total_incidents * 100)
plt.figure(figsize=(12, 8)) # Aumentado el alto para mejor visualización
sns.barplot(y=category_distribution.index, x=category_distribution.values)
plt.title('Category Distribution of Incidents')
plt.xlabel('Percentage of Incidents (%)')
plt.ylabel('Category')
plt.tight_layout()
plt.savefig('category_distribution.png')
plt.close()
df_results.to_csv('final_results.csv', index=False)
print("\nCategorization summary:")
print(f"Total incidents analyzed: {len(df_results)}")
print("\nCategory distribution:")
for category, percentage in category_distribution.items():
print(f"{category}: {percentage:.2f}%")
return df_results
except Exception as e:
print(f"Error creating final results: {str(e)}")
# Save partial results even if there was an error
if partial_results:
pd.DataFrame(partial_results).to_csv('error_partial_results.csv', index=False)
return pd.DataFrame()
该函数将部分结果分批存储,优雅地处理编码错误,并生成一个已分类事件的最终数据集。它还使用条形图可视化类别的分布,以便快速发现常见问题。这一步是将数千个支持请求的定性见解转化为可操作情报的关键,帮助团队确定优先级、自动化并持续改进其响应流程。 通过 数据可视化 ,我们可以更加直观地了解事件的分布情况,例如哪些类别的事件最多,哪些类别的事件最少,从而为决策提供支持。
7. 命令行执行:简化操作流程
为了使pipeline可以从命令行执行,文章包含一个 main
块,允许使用简单的脚本调用来处理您的数据。 这使得整个分析流程更加便捷和可重复。
import argparse
if __name__ == "__main__":
# Configure command-line arguments
parser = argparse.ArgumentParser(description='Analyze message topics')
parser.add_argument('--use-clean-messages', action='store_true',
help='Use already cleaned messages if available')
args = parser.parse_args()
# Replace with the path to your CSV file
file_path = "incident_data_html.csv"
try:
df_results = analyze_topics(file_path, args.use_clean_messages)
print("\nAnalysis completed. Result files have been generated.")
except Exception as e:
print(f"Error processing the file: {str(e)}")
此设置支持一个可选标志,用于使用先前运行中保存的预清理消息。您可以使用简单的CLI选项重新处理新数据或使用清理后的消息重新运行。 例如,可以使用 python analyze_messages.py --use-clean-messages
命令来运行分析。
总而言之,本文详细阐述了如何利用 生成式AI 和 大模型 技术,将复杂的支持工单分类流程转化为一个高效、智能的系统。从数据清洗,到模型调用,再到结果验证和可视化,每一步都经过精心设计,旨在最大限度地提升支持团队的工作效率,并为企业决策提供有力的支持。 随着 大模型 技术的不断发展,我们有理由相信,未来的支持工单处理将会更加自动化、智能化。