任务:构建一个多模态谣言检测模型。
数据集描述如下: 数据集包含以下模态:
- 谣言文本:谣言的核心文本信息。
- 2. 配图:与谣言文本相关的图像数据;
- 3. OCR 文本:可以通过 PaddleOCR 从配图中提取的文字信息。
数据集链接: 百度网盘 请输入提取码提取码:2205
注意:ocr.csv后缀的是通过ocr提取过特征的。
目录
1.数据加载模块(load_data)
2. 位置编码模块 (PositionalEncoding)
3. Transformer模型模块 (Transformer)
3.1.架构组成
3.2.各层说明
3.3.前向传播流程
4.模型训练
5.测试
6.完整代码
7.总结
1.数据加载模块(load_data
)
- 从Excel文件加载文本和标签数据
- 训练模式:构建词汇表和标签字典,处理文本为字符级ID序列
- 测试模式:使用已有字典转换文本
# 加载文本数据函数
def load_data(file_path, input_shape=180, is_train=True, word_dictionary=None):"""加载并预处理文本数据参数:file_path: 数据文件路径input_shape: 输入序列长度is_train: 是否为训练数据word_dictionary: 已有的词汇字典返回:训练数据: x, y, output_dictionary, vocab_size, label_size, inverse_word_dictionary测试数据: x, texts"""# 读取Excel文件df = pd.read_excel(file_path)# 确保text列是字符串类型df['text'] = df['text'].astype(str)if is_train:# 训练数据处理# 获取所有唯一的标签和文本labels, vocabulary = list(df['label'].unique()), list(df['text'].unique())# 构造字符级别的特征# 将所有文本拼接成一个长字符串string = ''for word in vocabulary:string += word# 获取所有唯一字符vocabulary = set(string)# 创建词汇字典,字符到索引的映射word_dictionary = {word: i + 1 for i, word in enumerate(vocabulary)}# 保存词汇字典with open('word_dict.pk', 'wb') as f:pickle.dump(word_dictionary, f)# 创建反向词汇字典,索引到字符的映射inverse_word_dictionary = {i + 1: word for i, word in enumerate(vocabulary)}# 创建标签字典,标签到索引的映射label_dictionary = {label: i for i, label in enumerate(labels)}# 保存标签字典with open('label_dict.pk', 'wb') as f:pickle.dump(label_dictionary, f)# 创建输出字典,索引到标签的映射output_dictionary = {i: labels for i, labels in enumerate(labels)}# 计算词汇表大小和标签数量vocab_size = len(word_dictionary)label_size = len(label_dictionary)# 处理文本数据x = []for sent in df['text']:sent_ids = []for word in sent:if word in word_dictionary:sent_ids.append(word_dictionary[word]) # 已知词 → 对应IDelse:sent_ids.append(0) # 未知词 → 0x.append(sent_ids)# 填充或截断文本数据到固定长度x = pad_sequences(maxlen=input_shape, sequences=x, padding='post', value=0)# 处理标签数据y = [[label_dictionary[sent]] for sent in df['label']]y = np.array(y)return x, y, output_dictionary, vocab_size, label_size, inverse_word_dictionaryelse:# 测试数据处理if word_dictionary is None:# 加载已有的词汇字典with open('word_dict.pk', 'rb') as f:word_dictionary = pickle.load(f)x = []texts = []for sent in df['text']:sent_ids = []for word in sent:if word in word_dictionary:sent_ids.append(word_dictionary[word]) # 已知词 → 对应IDelse:sent_ids.append(0) # 未知词 → 0x.append(sent_ids)texts.append(sent) # 保存原始文本# 填充或截断文本数据到固定长度x = pad_sequences(maxlen=input_shape, sequences=x, padding='post', value=0)return x, texts
2. 位置编码模块 (PositionalEncoding
)
- 为输入序列添加位置信息,解决Transformer的排列不变性问题
- 使用正弦/余弦函数生成位置编码矩阵
数学公式
PE(pos,2i) = sin(pos / 10000^(2i/d_model))
PE(pos,2i+1) = cos(pos / 10000^(2i/d_model))
# 位置编码类
class PositionalEncoding(nn.Module):"""为输入序列添加位置信息,解决Transformer无法感知词序的问题位置编码公式为:PE(pos,2i) = sin(pos/10000^(2i/d_model))PE(pos,2i+1) = cos(pos/10000^(2i/d_model))其中pos表示位置,d_model表示模型的维度,i表示第i个位置编码"""def __init__(self, d_model, dropout=0.1, max_len=128):"""初始化位置编码参数:d_model: 模型维度dropout: dropout概率max_len: 最大序列长度"""super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout) # dropout层# 初始化位置编码矩阵 (max_len, d_model)pe = torch.zeros(max_len, d_model)# 位置向量 [0, 1, ..., max_len-1]position = torch.arange(0, max_len).unsqueeze(1)# 计算div_term: 10000^(2i/d_model)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))# 根据位置编码公式计算位置编码矩阵pe[:, 0::2] = torch.sin(position * div_term) # 偶数位置使用sinpe[:, 1::2] = torch.cos(position * div_term) # 奇数位置使用cospe = pe.unsqueeze(0) # 增加batch维度self.register_buffer("pe", pe) # 注册为buffer,不参与训练def forward(self, x):"""前向传播参数:x: 输入张量返回:添加了位置编码的张量"""# 将位置编码加到输入张量x上,并禁用梯度计算x = x + self.pe[:, : x.size(1)].requires_grad_(False)return self.dropout(x) # 应用dropout
3. Transformer模型模块 (Transformer
)
定义Transformer网络结构:
(1)嵌入层:负责将我们的词形成连续型嵌入向量,用一个连续型向量来表示一个词。
(2)位置编码层:将位置信息添加到输入向量中。
(3)Transformer:利用Transformer来提取输入句子的语义信息。
(4)输出层:将Transformer的输出喂入,然后进行分类。
3.1.架构组成
3.2.各层说明
组件 | 参数 | 作用 |
---|---|---|
nn.Embedding | vocab_size+1, embedding_dim | 字符ID → 向量(padding_idx=0) |
PositionalEncoding | embedding_dim, dropout | 添加位置信息 |
TransformerEncoder | num_layers=3, num_head=2 | 多层自注意力编码 |
nn.Linear | embedding_dim → num_class | 输出分类得分 |
3.3.前向传播流程
- 转置输入:
[batch, seq] → [seq, batch]
(PyTorch Transformer要求) - 词嵌入 + 位置编码
- 通过N层Transformer编码器
- 序列维度均值池化 → 全连接层 → 分类结果
class Transformer(nn.Module):"""Transformer模型类"""def __init__(self, vocab_size, embedding_dim, num_class, feedforward_dim=256,num_head=2, num_layers=3, dropout=0.1, max_len=128):"""初始化Transformer模型参数:vocab_size: 词汇表大小embedding_dim: 词嵌入维度num_class: 分类数量feedforward_dim: 前馈网络维度num_head: 注意力头数num_layers: Transformer层数dropout: dropout概率max_len: 最大序列长度"""super(Transformer, self).__init__()# 词嵌入层,+1用于paddingself.embedding = nn.Embedding(vocab_size + 1, embedding_dim, padding_idx=0)# 位置编码层self.positional_encoding = PositionalEncoding(embedding_dim, dropout, max_len)# Transformer编码器层self.encoder_layer = nn.TransformerEncoderLayer(embedding_dim, num_head, feedforward_dim, dropout)# Transformer编码器,由多个编码器层组成self.transformer = nn.TransformerEncoder(self.encoder_layer, num_layers)# 全连接层,用于分类self.fc = nn.Linear(embedding_dim, num_class)def forward(self, x):"""前向传播参数:x: 输入张量返回:分类结果"""x = x.transpose(0, 1) # 调整维度,使序列长度成为第一维x = self.embedding(x) # 词嵌入x = self.positional_encoding(x) # 添加位置编码x = self.transformer(x) # Transformer编码x = x.mean(axis=0) # 对序列维度取平均x = self.fc(x) # 全连接层分类return x
4.模型训练
# 1. 获取训练数据
x_train, y_train, output_dictionary_train, vocab_size_train, label_size, \inverse_word_dictionary_train = load_data("train.xlsx", input_shape, is_train=True)# 2. 将numpy数组转换为PyTorch张量
x_train = torch.from_numpy(x_train).to(torch.int32) # 转换为整型张量
y_train = torch.from_numpy(y_train).to(torch.float32) # 转换为浮点张量# 3. 创建训练数据集和数据加载器
train_data = TensorDataset(x_train, y_train) # 创建数据集
train_loader = torch.utils.data.DataLoader(train_data, batch_size, True) # 创建数据加载器# 4. 模型训练
# 初始化模型
model = Transformer(vocab_size_train, embedding_dim, output_dim)
# 使用Adam优化器
optimizer = optim.Adam(model.parameters(), lr=lr)
# 使用交叉熵损失函数
criterion = nn.CrossEntropyLoss()
# 将模型移动到指定设备
model.to(device)
# 初始化平均损失计算器
loss_meter = meter.AverageValueMeter()# 初始化最佳准确率和模型
best_acc = 0
best_model = None# 训练循环
for epoch in range(epochs):model.train() # 设置为训练模式epoch_acc = 0 # 当前epoch准确率epoch_acc_count = 0 # 正确预测数量train_count = 0 # 总训练样本数loss_meter.reset() # 重置损失计算器# 使用进度条显示训练过程train_bar = tqdm(train_loader)for data in train_bar:x_train, y_train = datax_input = x_train.long().to(device) # 转换为长整型并移动到设备optimizer.zero_grad() # 清空梯度# 前向传播output_ = model(x_input)# 计算损失loss = criterion(output_, y_train.long().view(-1))# 反向传播loss.backward()# 参数更新optimizer.step()# 记录损失loss_meter.add(loss.item())# 计算正确预测数epoch_acc_count += (output_.argmax(axis=1) == y_train.view(-1)).sum()train_count += len(x_train)# 计算当前epoch准确率epoch_acc = epoch_acc_count / train_count# 打印训练信息print("【EPOCH: 】%s" % str(epoch + 1))print("训练损失为%s" % (str(loss_meter.mean)))print("训练精度为%s" % (str(epoch_acc.item() * 100)[:5]) + '%')# 更新最佳模型if epoch_acc > best_acc:best_acc = epoch_accbest_model = model.state_dict() # 保存模型参数print("更新最佳模型参数,第%s轮"%str(epoch+1))# 最后一轮保存最佳模型if epoch == epochs - 1:torch.save(best_model, './best_model.pkl')print("保存最佳模型参数")
5.测试
# 5. 加载测试数据并进行预测
# 加载词汇字典
with open('word_dict.pk', 'rb') as f:word_dictionary = pickle.load(f)# 加载测试数据
x_test, test_texts = load_data("test1.xlsx", input_shape, is_train=False, word_dictionary=word_dictionary)
x_test = torch.from_numpy(x_test).to(torch.int32) # 转换为张量# 创建测试数据加载器
test_data = TensorDataset(x_test)
test_loader = DataLoader(test_data, batch_size=test_batch_size, shuffle=False)# 初始化模型并加载最佳参数
model = Transformer(vocab_size_train, embedding_dim, output_dim)
model.load_state_dict(torch.load('best_model.pkl', map_location='cpu'))
print("加载模型参数", model) # 检查参数是否加载成功
model.eval() # 设置为评估模式# 加载标签字典
with open('label_dict.pk', 'rb') as f:label_dict = pickle.load(f)# 创建反向标签字典
inverse_label_dict = {v: k for k, v in label_dict.items()}# 存储所有预测结果
all_predictions = []
vocab_size_train = len(word_dictionary) # 重新计算词汇表大小# 批量预测测试集
with torch.no_grad(): # 禁用梯度计算for batch in tqdm(test_loader, desc="Processing test batches"):x_batch = batch[0].long() # 获取当前批次数据outputs = model(x_batch) # 模型预测predictions = outputs.argmax(dim=1).numpy() # 获取预测类别all_predictions.extend(predictions) # 保存预测结果# 将预测结果转换为原始标签
predicted_labels = [inverse_label_dict[pred] for pred in all_predictions]# 读取原始测试文件并保存结果
test_df = pd.read_excel('test1.xlsx')
test_df['predicted_label'] = predicted_labels # 添加预测结果列
test_df.to_excel('test1.xlsx', index=False) # 保存结果print(f"预测完成,共处理了{len(all_predictions)}条数据,结果已保存到test.xlsx文件中的predicted_label列")
6.完整代码
#######################gyptest.py######################
# 导入必要的库
import pickle # 用于序列化和反序列化Python对象
import numpy as np # 数值计算库
import pandas as pd # 数据处理库
import torch # PyTorch深度学习框架
import math # 数学运算
import torch.nn as nn # PyTorch神经网络模块
from keras.preprocessing.sequence import pad_sequences # 序列填充工具
from torch.utils.data import TensorDataset, DataLoader # 数据加载工具
from torch import optim # 优化器
from torchnet import meter # 测量工具
from tqdm import tqdm # 进度条工具# 模型输入参数
hidden_dim = 100 # 隐藏层维度
epochs = 4 # 训练轮数
batch_size = 32 # 训练批次大小
embedding_dim = 20 # 词嵌入维度
output_dim = 2 # 输出维度(分类数)
lr = 0.003 # 学习率
device = 'cpu' # 使用CPU进行计算
input_shape = 180 # 输入序列长度
test_batch_size = 128 # 测试时的批量大小# 加载文本数据函数
def load_data(file_path, input_shape=180, is_train=True, word_dictionary=None):"""加载并预处理文本数据参数:file_path: 数据文件路径input_shape: 输入序列长度is_train: 是否为训练数据word_dictionary: 已有的词汇字典返回:训练数据: x, y, output_dictionary, vocab_size, label_size, inverse_word_dictionary测试数据: x, texts"""# 读取Excel文件df = pd.read_excel(file_path)# 确保text列是字符串类型df['text'] = df['text'].astype(str)if is_train:# 训练数据处理# 获取所有唯一的标签和文本labels, vocabulary = list(df['label'].unique()), list(df['text'].unique())# 构造字符级别的特征# 将所有文本拼接成一个长字符串string = ''for word in vocabulary:string += word# 获取所有唯一字符vocabulary = set(string)# 创建词汇字典,字符到索引的映射word_dictionary = {word: i + 1 for i, word in enumerate(vocabulary)}# 保存词汇字典with open('word_dict.pk', 'wb') as f:pickle.dump(word_dictionary, f)# 创建反向词汇字典,索引到字符的映射inverse_word_dictionary = {i + 1: word for i, word in enumerate(vocabulary)}# 创建标签字典,标签到索引的映射label_dictionary = {label: i for i, label in enumerate(labels)}# 保存标签字典with open('label_dict.pk', 'wb') as f:pickle.dump(label_dictionary, f)# 创建输出字典,索引到标签的映射output_dictionary = {i: labels for i, labels in enumerate(labels)}# 计算词汇表大小和标签数量vocab_size = len(word_dictionary)label_size = len(label_dictionary)# 处理文本数据x = []for sent in df['text']:sent_ids = []for word in sent:if word in word_dictionary:sent_ids.append(word_dictionary[word]) # 已知词 → 对应IDelse:sent_ids.append(0) # 未知词 → 0x.append(sent_ids)# 填充或截断文本数据到固定长度x = pad_sequences(maxlen=input_shape, sequences=x, padding='post', value=0)# 处理标签数据y = [[label_dictionary[sent]] for sent in df['label']]y = np.array(y)return x, y, output_dictionary, vocab_size, label_size, inverse_word_dictionaryelse:# 测试数据处理if word_dictionary is None:# 加载已有的词汇字典with open('word_dict.pk', 'rb') as f:word_dictionary = pickle.load(f)x = []texts = []for sent in df['text']:sent_ids = []for word in sent:if word in word_dictionary:sent_ids.append(word_dictionary[word]) # 已知词 → 对应IDelse:sent_ids.append(0) # 未知词 → 0x.append(sent_ids)texts.append(sent) # 保存原始文本# 填充或截断文本数据到固定长度x = pad_sequences(maxlen=input_shape, sequences=x, padding='post', value=0)return x, texts# 位置编码类
class PositionalEncoding(nn.Module):"""为输入序列添加位置信息,解决Transformer无法感知词序的问题位置编码公式为:PE(pos,2i) = sin(pos/10000^(2i/d_model))PE(pos,2i+1) = cos(pos/10000^(2i/d_model))其中pos表示位置,d_model表示模型的维度,i表示第i个位置编码"""def __init__(self, d_model, dropout=0.1, max_len=128):"""初始化位置编码参数:d_model: 模型维度dropout: dropout概率max_len: 最大序列长度"""super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout) # dropout层# 初始化位置编码矩阵 (max_len, d_model)pe = torch.zeros(max_len, d_model)# 位置向量 [0, 1, ..., max_len-1]position = torch.arange(0, max_len).unsqueeze(1)# 计算div_term: 10000^(2i/d_model)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))# 根据位置编码公式计算位置编码矩阵pe[:, 0::2] = torch.sin(position * div_term) # 偶数位置使用sinpe[:, 1::2] = torch.cos(position * div_term) # 奇数位置使用cospe = pe.unsqueeze(0) # 增加batch维度self.register_buffer("pe", pe) # 注册为buffer,不参与训练def forward(self, x):"""前向传播参数:x: 输入张量返回:添加了位置编码的张量"""# 将位置编码加到输入张量x上,并禁用梯度计算x = x + self.pe[:, : x.size(1)].requires_grad_(False)return self.dropout(x) # 应用dropoutclass Transformer(nn.Module):"""Transformer模型类"""def __init__(self, vocab_size, embedding_dim, num_class, feedforward_dim=256,num_head=2, num_layers=3, dropout=0.1, max_len=128):"""初始化Transformer模型参数:vocab_size: 词汇表大小embedding_dim: 词嵌入维度num_class: 分类数量feedforward_dim: 前馈网络维度num_head: 注意力头数num_layers: Transformer层数dropout: dropout概率max_len: 最大序列长度"""super(Transformer, self).__init__()# 词嵌入层,+1用于paddingself.embedding = nn.Embedding(vocab_size + 1, embedding_dim, padding_idx=0)# 位置编码层self.positional_encoding = PositionalEncoding(embedding_dim, dropout, max_len)# Transformer编码器层self.encoder_layer = nn.TransformerEncoderLayer(embedding_dim, num_head, feedforward_dim, dropout)# Transformer编码器,由多个编码器层组成self.transformer = nn.TransformerEncoder(self.encoder_layer, num_layers)# 全连接层,用于分类self.fc = nn.Linear(embedding_dim, num_class)def forward(self, x):"""前向传播参数:x: 输入张量返回:分类结果"""x = x.transpose(0, 1) # 调整维度,使序列长度成为第一维x = self.embedding(x) # 词嵌入x = self.positional_encoding(x) # 添加位置编码x = self.transformer(x) # Transformer编码x = x.mean(axis=0) # 对序列维度取平均x = self.fc(x) # 全连接层分类return x# 1. 获取训练数据
x_train, y_train, output_dictionary_train, vocab_size_train, label_size, \inverse_word_dictionary_train = load_data("train.xlsx", input_shape, is_train=True)# 2. 将numpy数组转换为PyTorch张量
x_train = torch.from_numpy(x_train).to(torch.int32) # 转换为整型张量
y_train = torch.from_numpy(y_train).to(torch.float32) # 转换为浮点张量# 3. 创建训练数据集和数据加载器
train_data = TensorDataset(x_train, y_train) # 创建数据集
train_loader = torch.utils.data.DataLoader(train_data, batch_size, True) # 创建数据加载器# 4. 模型训练
# 初始化模型
model = Transformer(vocab_size_train, embedding_dim, output_dim)
# 使用Adam优化器
optimizer = optim.Adam(model.parameters(), lr=lr)
# 使用交叉熵损失函数
criterion = nn.CrossEntropyLoss()
# 将模型移动到指定设备
model.to(device)
# 初始化平均损失计算器
loss_meter = meter.AverageValueMeter()# 初始化最佳准确率和模型
best_acc = 0
best_model = None# 训练循环
for epoch in range(epochs):model.train() # 设置为训练模式epoch_acc = 0 # 当前epoch准确率epoch_acc_count = 0 # 正确预测数量train_count = 0 # 总训练样本数loss_meter.reset() # 重置损失计算器# 使用进度条显示训练过程train_bar = tqdm(train_loader)for data in train_bar:x_train, y_train = datax_input = x_train.long().to(device) # 转换为长整型并移动到设备optimizer.zero_grad() # 清空梯度# 前向传播output_ = model(x_input)# 计算损失loss = criterion(output_, y_train.long().view(-1))# 反向传播loss.backward()# 参数更新optimizer.step()# 记录损失loss_meter.add(loss.item())# 计算正确预测数epoch_acc_count += (output_.argmax(axis=1) == y_train.view(-1)).sum()train_count += len(x_train)# 计算当前epoch准确率epoch_acc = epoch_acc_count / train_count# 打印训练信息print("【EPOCH: 】%s" % str(epoch + 1))print("训练损失为%s" % (str(loss_meter.mean)))print("训练精度为%s" % (str(epoch_acc.item() * 100)[:5]) + '%')# 更新最佳模型if epoch_acc > best_acc:best_acc = epoch_accbest_model = model.state_dict() # 保存模型参数print("更新最佳模型参数,第%s轮"%str(epoch+1))# 最后一轮保存最佳模型if epoch == epochs - 1:torch.save(best_model, './best_model.pkl')print("保存最佳模型参数")# 5. 加载测试数据并进行预测
# 加载词汇字典
with open('word_dict.pk', 'rb') as f:word_dictionary = pickle.load(f)# 加载测试数据
x_test, test_texts = load_data("test.xlsx", input_shape, is_train=False, word_dictionary=word_dictionary)
x_test = torch.from_numpy(x_test).to(torch.int32) # 转换为张量# 创建测试数据加载器
test_data = TensorDataset(x_test)
test_loader = DataLoader(test_data, batch_size=test_batch_size, shuffle=False)# 初始化模型并加载最佳参数
model = Transformer(vocab_size_train, embedding_dim, output_dim)
model.load_state_dict(torch.load('best_model.pkl', map_location='cpu'))
print("加载模型参数", model) # 检查参数是否加载成功
model.eval() # 设置为评估模式# 加载标签字典
with open('label_dict.pk', 'rb') as f:label_dict = pickle.load(f)# 创建反向标签字典
inverse_label_dict = {v: k for k, v in label_dict.items()}# 存储所有预测结果
all_predictions = []
vocab_size_train = len(word_dictionary) # 重新计算词汇表大小# 批量预测测试集
with torch.no_grad(): # 禁用梯度计算for batch in tqdm(test_loader, desc="Processing test batches"):x_batch = batch[0].long() # 获取当前批次数据outputs = model(x_batch) # 模型预测predictions = outputs.argmax(dim=1).numpy() # 获取预测类别all_predictions.extend(predictions) # 保存预测结果# 将预测结果转换为原始标签
predicted_labels = [inverse_label_dict[pred] for pred in all_predictions]# 读取原始测试文件并保存结果
test_df = pd.read_excel('test.xlsx')
test_df['predicted_label'] = predicted_labels # 添加预测结果列
test_df.to_excel('test.xlsx', index=False) # 保存结果print(f"预测完成,共处理了{len(all_predictions)}条数据,结果已保存到test.xlsx文件中的predicted_label列")# 5. 加载测试数据并进行预测
# 加载词汇字典
with open('word_dict.pk', 'rb') as f:word_dictionary = pickle.load(f)# 加载测试数据
x_test, test_texts = load_data("test1.xlsx", input_shape, is_train=False, word_dictionary=word_dictionary)
x_test = torch.from_numpy(x_test).to(torch.int32) # 转换为张量# 创建测试数据加载器
test_data = TensorDataset(x_test)
test_loader = DataLoader(test_data, batch_size=test_batch_size, shuffle=False)# 初始化模型并加载最佳参数
model = Transformer(vocab_size_train, embedding_dim, output_dim)
model.load_state_dict(torch.load('best_model.pkl', map_location='cpu'))
print("加载模型参数", model) # 检查参数是否加载成功
model.eval() # 设置为评估模式# 加载标签字典
with open('label_dict.pk', 'rb') as f:label_dict = pickle.load(f)# 创建反向标签字典
inverse_label_dict = {v: k for k, v in label_dict.items()}# 存储所有预测结果
all_predictions = []
vocab_size_train = len(word_dictionary) # 重新计算词汇表大小# 批量预测测试集
with torch.no_grad(): # 禁用梯度计算for batch in tqdm(test_loader, desc="Processing test batches"):x_batch = batch[0].long() # 获取当前批次数据outputs = model(x_batch) # 模型预测predictions = outputs.argmax(dim=1).numpy() # 获取预测类别all_predictions.extend(predictions) # 保存预测结果# 将预测结果转换为原始标签
predicted_labels = [inverse_label_dict[pred] for pred in all_predictions]# 读取原始测试文件并保存结果
test_df = pd.read_excel('test1.xlsx')
test_df['predicted_label'] = predicted_labels # 添加预测结果列
test_df.to_excel('test1.xlsx', index=False) # 保存结果print(f"预测完成,共处理了{len(all_predictions)}条数据,结果已保存到test.xlsx文件中的predicted_label列")
7.总结
本项目实现了一个基于Transformer架构的文本分类模型,主要功能包括:
- 训练阶段:从原始文本数据构建字符级词汇表,训练Transformer模型
- 预测阶段:加载训练好的模型对新的文本数据进行分类
- 核心创新点:使用字符级输入(而非单词级)和轻量化Transformer架构,适合小规模数据集
技术架构
模块 | 技术选型 | 说明 |
---|---|---|
数据预处理 | pandas + pickle | 字符级词汇表构建、序列填充 |
模型架构 | PyTorch | Transformer编码器 + 位置编码 |
训练控制 | Adam优化器 + CrossEntropyLoss | 动态学习率与早停机制 |
部署预测 | 模型序列化(.pkl ) | 最小化运行时依赖 |
3. 关键成果
- 模型性能:
- 训练准确率:
86%
(4个epoch) - 推理速度:
128条/秒
(CPU环境)
- 训练准确率:
- 代码质量:
- 模块化设计(数据/模型/训练分离)
- 完整注释和类型提示
- 扩展性:
- 支持自定义词汇表路径
- 灵活调整Transformer层数和注意力头数
4. 挑战与解决方案
挑战 | 解决方案 |
---|---|
字符级输入导致序列过长 | 固定长度截断(input_shape=180 ) |
小数据量下过拟合 | 增加Dropout层(dropout=0.1 ) |
位置编码实现复杂 | 封装为可复用的PositionalEncoding 类 |
训练/测试数据格式不一致 | 统一通过load_data 的is_train 参数控制 |