bert,albert,robotbert,xl_net
1. 贝叶斯公式
import math
import jieba
import re
import os
import json
from collections import defaultdictjieba.initialize()"""
贝叶斯分类实践P(A|B) = (P(A) * P(B|A)) / P(B)
事件A:文本属于类别x1。文本属于类别x的概率,记做P(x1)
事件B:文本为s (s=w1w2w3..wn)
P(x1|s) = 文本为s,属于x1类的概率. #求解目标#
P(x1|s) = P(x1|w1, w2, w3...wn) = P(w1, w2..wn|x1) * P(x1) / P(w1, w2, w3...wn)P(x1) 任意样本属于x1的概率。x1样本数/总样本数
P(w1, w2..wn|x1) = P(w1|x1) * P(w2|x1)...P(wn|x1) 词的独立性假设
P(w1|x1) x1类样本中,w1出现的频率公共分母的计算,使用全概率公式:
P(w1, w2, w3...wn) = P(w1,w2..Wn|x1)*P(x1) + P(w1,w2..Wn|x2)*P(x2) ... P(w1,w2..Wn|xn)*P(xn)
"""class BayesApproach:def __init__(self, data_path):self.p_class = defaultdict(int)self.word_class_prob = defaultdict(dict)self.load(data_path)def load(self, path):self.class_name_to_word_freq = defaultdict(dict)self.all_words = set() #汇总一个词表with open(path, encoding="utf8") as f:for line in f:line = json.loads(line)class_name = line["tag"]title = line["title"]words = jieba.lcut(title)self.all_words = self.all_words.union(set(words))self.p_class[class_name] += 1 #记录每个类别样本数量word_freq = self.class_name_to_word_freq[class_name]#记录每个类别下的词频for word in words:if word not in word_freq:word_freq[word] = 1else:word_freq[word] += 1self.freq_to_prob()return#将记录的词频和样本频率都转化为概率def freq_to_prob(self):#样本概率计算total_sample_count = sum(self.p_class.values())self.p_class = dict([c, self.p_class[c] / total_sample_count] for c in self.p_class)#词概率计算self.word_class_prob = defaultdict(dict)for class_name, word_freq in self.class_name_to_word_freq.items():total_word_count = sum(count for count in word_freq.values()) #每个类别总词数for word in word_freq:#加1平滑,避免出现概率为0,计算P(wn|x1)prob = (word_freq[word] + 1) / (total_word_count + len(self.all_words))self.word_class_prob[class_name][word] = probself.word_class_prob[class_name]["<unk>"] = 1/(total_word_count + len(self.all_words))return#P(w1|x1) * P(w2|x1)...P(wn|x1)def get_words_class_prob(self, words, class_name):result = 1for word in words:unk_prob = self.word_class_prob[class_name]["<unk>"]result *= self.word_class_prob[class_name].get(word, unk_prob)return result#计算P(w1, w2..wn|x1) * P(x1)def get_class_prob(self, words, class_name):#P(x1)p_x = self.p_class[class_name]# P(w1, w2..wn|x1) = P(w1|x1) * P(w2|x1)...P(wn|x1)p_w_x = self.get_words_class_prob(words, class_name)return p_x * p_w_x#做文本分类def classify(self, sentence):words = jieba.lcut(sentence) #切词results = []for class_name in self.p_class:prob = self.get_class_prob(words, class_name) #计算class_name类概率results.append([class_name, prob])results = sorted(results, key=lambda x:x[1], reverse=True) #排序#计算公共分母:P(w1, w2, w3...wn) = P(w1,w2..Wn|x1)*P(x1) + P(w1,w2..Wn|x2)*P(x2) ... P(w1,w2..Wn|xn)*P(xn)#不做这一步也可以,对顺序没影响,只不过得到的不是0-1之间的概率值pw = sum([x[1] for x in results]) #P(w1, w2, w3...wn)results = [[c, prob/pw] for c, prob in results]#打印结果for class_name, prob in results:print("属于类别[%s]的概率为%f" % (class_name, prob))return resultsif __name__ == "__main__":path = "../data/train_tag_news.json"ba = BayesApproach(path)query = "中国三款导弹可发射多弹头 美无法防御很急躁"ba.classify(query)
2. 支持向量机(SVM)
线性:
#!/usr/bin/env python3
#coding: utf-8#使用基于词向量的分类器
#对比几种模型的效果import json
import jieba
import numpy as np
from gensim.models import Word2Vec
from sklearn.metrics import classification_report
from sklearn.svm import SVC
from collections import defaultdictLABELS = {'健康': 0, '军事': 1, '房产': 2, '社会': 3, '国际': 4, '旅游': 5, '彩票': 6, '时尚': 7, '文化': 8, '汽车': 9, '体育': 10, '家居': 11, '教育': 12, '娱乐': 13, '科技': 14, '股票': 15, '游戏': 16, '财经': 17}#输入模型文件路径
#加载训练好的模型
def load_word2vec_model(path):model = Word2Vec.load(path)return model#加载数据集
def load_sentence(path, model):sentences = []labels = []with open(path, encoding="utf8") as f:for line in f:line = json.loads(line)title, content = line["title"], line["content"]sentences.append(" ".join(jieba.lcut(title)))labels.append(line["tag"])train_x = sentences_to_vectors(sentences, model)train_y = label_to_label_index(labels)return train_x, train_y#tag标签转化为类别标号
def label_to_label_index(labels):return [LABELS[y] for y in labels]#文本向量化,使用了基于这些文本训练的词向量
def sentences_to_vectors(sentences, model):vectors = []for sentence in sentences:words = sentence.split()vector = np.zeros(model.vector_size)for word in words:try:vector += model.wv[word]# vector = np.max([vector, model.wv[word]], axis=0)except KeyError:vector += np.zeros(model.vector_size)vectors.append(vector / len(words))return np.array(vectors)def main():model = load_word2vec_model("model.w2v")train_x, train_y = load_sentence("../data/train_tag_news.json", model)test_x, test_y = load_sentence("../data/valid_tag_news.json", model)classifier = SVC()classifier.fit(train_x, train_y)y_pred = classifier.predict(test_x)print(classification_report(test_y, y_pred))if __name__ == "__main__":main()
核函数:
假设存在一个特征映射函数 ϕ,使得 K(x,y)=ϕ(x)⋅ϕ(y)。核技巧通过直接使用 K(x,y) 计算内积,而无需明确地知道或计算 ϕ(x)。核函数的作用是可以低维映射到高维,从而进行分类。
3. CNN神经网络
这个地方有点小坑,bias偏移量的个数等于卷积核的个数,如下面表示的是8个卷积核,每个卷积核是6*2的矩阵,和我们输入的6*8的矩阵去做对位相乘,得到的是6*(8 - 2 + 1)也就是6*7的矩阵然后sum求和得到一个数值,求完和以后加上一个常数值bias,第几个卷积核,加的就是第几个bias值。这样计算完第一个卷积,得到的是一维的数组,形状变成1 * 7。因为有8个卷积核,就需要计算八次,最后得到8*7的矩阵。这里的卷积核8和输入的矩阵列维度8没有关系。
import torch
import torch.nn as nn
import numpy as np#使用pytorch的1维卷积层input_dim = 6
hidden_size = 8
kernel_size = 2
torch_cnn1d = nn.Conv1d(input_dim, hidden_size, kernel_size)
for key, weight in torch_cnn1d.state_dict().items():print(key, weight.shape)x = torch.rand((6, 8)) #embedding_size * max_lengthdef numpy_cnn1d(x, state_dict):weight = state_dict["weight"].numpy()bias = state_dict["bias"].numpy()sequence_output = []for i in range(0, x.shape[1] - kernel_size + 1):window = x[:, i:i+kernel_size]kernel_outputs = []for kernel in weight:kernel_outputs.append(np.sum(kernel * window))sequence_output.append(np.array(kernel_outputs) + bias)return np.array(sequence_output).Tprint(x.shape)
print(torch_cnn1d(x.unsqueeze(0)))
print(torch_cnn1d(x.unsqueeze(0)).shape)
print(numpy_cnn1d(x.numpy(), torch_cnn1d.state_dict()))
4. LSTM
import torch
import torch.nn as nn
import numpy as np'''
用矩阵运算的方式复现一些基础的模型结构
清楚模型的计算细节,有助于加深对于模型的理解,以及模型转换等工作
'''#构造一个输入
length = 6
input_dim = 12
hidden_size = 7
x = np.random.random((length, input_dim))
# print(x)#使用pytorch的lstm层
torch_lstm = nn.LSTM(input_dim, hidden_size, batch_first=True)
for key, weight in torch_lstm.state_dict().items():print(key, weight.shape)def sigmoid(x):return 1/(1 + np.exp(-x))#将pytorch的lstm网络权重拿出来,用numpy通过矩阵运算实现lstm的计算
def numpy_lstm(x, state_dict):weight_ih = state_dict["weight_ih_l0"].numpy()weight_hh = state_dict["weight_hh_l0"].numpy()bias_ih = state_dict["bias_ih_l0"].numpy()bias_hh = state_dict["bias_hh_l0"].numpy()#pytorch将四个门的权重拼接存储,我们将它拆开w_i_x, w_f_x, w_c_x, w_o_x = weight_ih[0:hidden_size, :], \weight_ih[hidden_size:hidden_size*2, :],\weight_ih[hidden_size*2:hidden_size*3, :],\weight_ih[hidden_size*3:hidden_size*4, :]w_i_h, w_f_h, w_c_h, w_o_h = weight_hh[0:hidden_size, :], \weight_hh[hidden_size:hidden_size * 2, :], \weight_hh[hidden_size * 2:hidden_size * 3, :], \weight_hh[hidden_size * 3:hidden_size * 4, :]b_i_x, b_f_x, b_c_x, b_o_x = bias_ih[0:hidden_size], \bias_ih[hidden_size:hidden_size * 2], \bias_ih[hidden_size * 2:hidden_size * 3], \bias_ih[hidden_size * 3:hidden_size * 4]b_i_h, b_f_h, b_c_h, b_o_h = bias_hh[0:hidden_size], \bias_hh[hidden_size:hidden_size * 2], \bias_hh[hidden_size * 2:hidden_size * 3], \bias_hh[hidden_size * 3:hidden_size * 4]w_i = np.concatenate([w_i_h, w_i_x], axis=1)w_f = np.concatenate([w_f_h, w_f_x], axis=1)w_c = np.concatenate([w_c_h, w_c_x], axis=1)w_o = np.concatenate([w_o_h, w_o_x], axis=1)b_f = b_f_h + b_f_xb_i = b_i_h + b_i_xb_c = b_c_h + b_c_xb_o = b_o_h + b_o_xc_t = np.zeros((1, hidden_size))h_t = np.zeros((1, hidden_size))sequence_output = []for x_t in x:x_t = x_t[np.newaxis, :]hx = np.concatenate([h_t, x_t], axis=1)# f_t = sigmoid(np.dot(x_t, w_f_x.T) + b_f_x + np.dot(h_t, w_f_h.T) + b_f_h)f_t = sigmoid(np.dot(hx, w_f.T) + b_f)# i_t = sigmoid(np.dot(x_t, w_i_x.T) + b_i_x + np.dot(h_t, w_i_h.T) + b_i_h)i_t = sigmoid(np.dot(hx, w_i.T) + b_i)# g = np.tanh(np.dot(x_t, w_c_x.T) + b_c_x + np.dot(h_t, w_c_h.T) + b_c_h)g = np.tanh(np.dot(hx, w_c.T) + b_c)c_t = f_t * c_t + i_t * g# o_t = sigmoid(np.dot(x_t, w_o_x.T) + b_o_x + np.dot(h_t, w_o_h.T) + b_o_h)o_t = sigmoid(np.dot(hx, w_o.T) + b_o)h_t = o_t * np.tanh(c_t)sequence_output.append(h_t)return np.array(sequence_output), (h_t, c_t)torch_sequence_output, (torch_h, torch_c) = torch_lstm(torch.Tensor([x]))
numpy_sequence_output, (numpy_h, numpy_c) = numpy_lstm(x, torch_lstm.state_dict())print(torch_sequence_output)
print(numpy_sequence_output)
print("--------")
print(torch_h)
print(numpy_h)
print("--------")
print(torch_c)
print(numpy_c)##############################################################使用pytorch的GRU层
torch_gru = nn.GRU(input_dim, hidden_size, batch_first=True)
# for key, weight in torch_gru.state_dict().items():
# print(key, weight.shape)#将pytorch的GRU网络权重拿出来,用numpy通过矩阵运算实现GRU的计算
def numpy_gru(x, state_dict):weight_ih = state_dict["weight_ih_l0"].numpy()weight_hh = state_dict["weight_hh_l0"].numpy()bias_ih = state_dict["bias_ih_l0"].numpy()bias_hh = state_dict["bias_hh_l0"].numpy()#pytorch将3个门的权重拼接存储,我们将它拆开w_r_x, w_z_x, w_x = weight_ih[0:hidden_size, :], \weight_ih[hidden_size:hidden_size * 2, :],\weight_ih[hidden_size * 2:hidden_size * 3, :]w_r_h, w_z_h, w_h = weight_hh[0:hidden_size, :], \weight_hh[hidden_size:hidden_size * 2, :], \weight_hh[hidden_size * 2:hidden_size * 3, :]b_r_x, b_z_x, b_x = bias_ih[0:hidden_size], \bias_ih[hidden_size:hidden_size * 2], \bias_ih[hidden_size * 2:hidden_size * 3]b_r_h, b_z_h, b_h = bias_hh[0:hidden_size], \bias_hh[hidden_size:hidden_size * 2], \bias_hh[hidden_size * 2:hidden_size * 3]w_z = np.concatenate([w_z_h, w_z_x], axis=1)w_r = np.concatenate([w_r_h, w_r_x], axis=1)b_z = b_z_h + b_z_xb_r = b_r_h + b_r_xh_t = np.zeros((1, hidden_size))sequence_output = []for x_t in x:x_t = x_t[np.newaxis, :]hx = np.concatenate([h_t, x_t], axis=1)z_t = sigmoid(np.dot(hx, w_z.T) + b_z)r_t = sigmoid(np.dot(hx, w_r.T) + b_r)h = np.tanh(r_t * (np.dot(h_t, w_h.T) + b_h) + np.dot(x_t, w_x.T) + b_x)h_t = (1 - z_t) * h + z_t * h_tsequence_output.append(h_t)return np.array(sequence_output), h_t# torch_sequence_output, torch_h = torch_gru(torch.Tensor([x]))
# numpy_sequence_output, numpy_h = numpy_gru(x, torch_gru.state_dict())
#
# print(torch_sequence_output)
# print(numpy_sequence_output)
# print("--------")
# print(torch_h)
# print(numpy_h)
5. BCE损失函数
首先一句话概括两者区别,BCE用于“是不是”问题,例如LR输出概率,明天下雨or不下雨的概率;CE用于“是哪个”问题,比如多分类问题。BCE+sigmoid进行使用,CE+softmax进行使用
bce损失函数处理多标签多任务:
import torch
import torch.nn as nn#bce loss pytorch使用
sig = nn.Sigmoid()
input = torch.randn(5) #随机构造一个模型当前的预测结果 y_pred
input = sig(input)
target = torch.FloatTensor([1,0,1,0,1]) #真实label y_true
bceloss = nn.BCELoss() #bce loss
loss = bceloss(input, target)
print(loss)#自己模拟
l = 0
for x, y in zip(input, target):l += y*torch.log(x) + (1-y)*torch.log(1-x)
l = -l/5 #求和、平均、取负
print(l)#对比交叉熵,区别在于交叉熵要求真实值是一个固定类别
# celoss = nn.CrossEntropyLoss()
# input = torch.FloatTensor([[0.1,0.2,0.3,0.1,0.3],[0.1,0.2,0.3,0.1,0.3]])
# target = torch.LongTensor([2,3])
# output = celoss(input, target)
# print(output)
交叉熵(CE和BCE损失函数)推导过程:
https://zhuanlan.zhihu.com/p/667371691
softmax函数和sigmoid函数还有一个特点,就是用复合函数求偏导的过程中,导数可以写成模型预测值y_pred和输出值y的形式,如sigmoid函数,求导就是y_pred(1-y_pred),可以参考对权重求偏导,对sigmoid那一层求完偏导,然后就要继续对权重w进行求导了(如果之前对输入x值有线性变化的话,没有则对sigmoid求导完就结束了),但是分母正好有y_pred(1-y_pred)就可以约掉。能简化运算。
更直观一点,就比如均方差损失函数y_pred - y的平方,求偏导的话就是2*(y_pred - y)在乘以
y_pred对权重w的求导,这里y_pred - y就直接用两个输入相减计算,而不是用wx+b的形式计算y_pred,在减去y。
evaluate:
# -*- coding: utf-8 -*-
import torch
from loader import load_data"""
模型效果测试
"""class Evaluator:def __init__(self, config, model, logger):self.config = configself.model = modelself.logger = loggerself.valid_data = load_data(config["valid_data_path"], config, shuffle=False)self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果def eval(self, epoch):self.logger.info("开始测试第%d轮模型效果:" % epoch)self.model.eval()self.stats_dict = {"correct": 0, "wrong": 0} # 清空上一轮结果for index, batch_data in enumerate(self.valid_data):if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]input_ids, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况with torch.no_grad():pred_results = self.model(input_ids) #不输入labels,使用模型当前参数进行预测self.write_stats(labels, pred_results)acc = self.show_stats()return accdef write_stats(self, labels, pred_results):assert len(labels) == len(pred_results)for true_label, pred_label in zip(labels, pred_results):pred_label = torch.argmax(pred_label)if int(true_label) == int(pred_label):self.stats_dict["correct"] += 1else:self.stats_dict["wrong"] += 1returndef show_stats(self):correct = self.stats_dict["correct"]wrong = self.stats_dict["wrong"]self.logger.info("预测集合条目总量:%d" % (correct +wrong))self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))self.logger.info("--------------------")return correct / (correct + wrong)
loader:
# -*- coding: utf-8 -*-import json
import re
import os
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
"""
数据加载
"""class DataGenerator:def __init__(self, data_path, config):self.config = configself.path = data_pathself.index_to_label = {0: '家居', 1: '房产', 2: '股票', 3: '社会', 4: '文化',5: '国际', 6: '教育', 7: '军事', 8: '彩票', 9: '旅游',10: '体育', 11: '科技', 12: '汽车', 13: '健康',14: '娱乐', 15: '财经', 16: '时尚', 17: '游戏'}self.label_to_index = dict((y, x) for x, y in self.index_to_label.items())self.config["class_num"] = len(self.index_to_label)if self.config["model_type"] == "bert":self.tokenizer = BertTokenizer.from_pretrained(config["pretrain_model_path"])self.vocab = load_vocab(config["vocab_path"])self.config["vocab_size"] = len(self.vocab)self.load()def load(self):self.data = []with open(self.path, encoding="utf8") as f:for line in f:line = json.loads(line)tag = line["tag"]label = self.label_to_index[tag]title = line["title"]if self.config["model_type"] == "bert":input_id = self.tokenizer.encode(title, max_length=self.config["max_length"], pad_to_max_length=True)else:input_id = self.encode_sentence(title)input_id = torch.LongTensor(input_id)label_index = torch.LongTensor([label])self.data.append([input_id, label_index])returndef encode_sentence(self, text):input_id = []for char in text:input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))input_id = self.padding(input_id)return input_id#补齐或截断输入的序列,使其可以在一个batch内运算def padding(self, input_id):input_id = input_id[:self.config["max_length"]]input_id += [0] * (self.config["max_length"] - len(input_id))return input_iddef __len__(self):return len(self.data)def __getitem__(self, index):return self.data[index]def load_vocab(vocab_path):token_dict = {}with open(vocab_path, encoding="utf8") as f:for index, line in enumerate(f):token = line.strip()token_dict[token] = index + 1 #0留给padding位置,所以从1开始return token_dict#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):dg = DataGenerator(data_path, config)dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)return dlif __name__ == "__main__":from config import Configdg = DataGenerator("valid_tag_news.json", Config)print(dg[1])
main:
# -*- coding: utf-8 -*-import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import TorchModel, choose_optimizer
from evaluate import Evaluator
from loader import load_data
#[DEBUG, INFO, WARNING, ERROR, CRITICAL]
logging.basicConfig(level=logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)"""
模型训练主程序
"""seed = Config["seed"]
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)def main(config):#创建保存模型的目录if not os.path.isdir(config["model_path"]):os.mkdir(config["model_path"])#加载训练数据train_data = load_data(config["train_data_path"], config)#加载模型model = TorchModel(config)# 标识是否使用gpucuda_flag = torch.cuda.is_available()if cuda_flag:logger.info("gpu可以使用,迁移模型至gpu")model = model.cuda()#加载优化器optimizer = choose_optimizer(config, model)#加载效果测试类evaluator = Evaluator(config, model, logger)#训练for epoch in range(config["epoch"]):epoch += 1model.train()logger.info("epoch %d begin" % epoch)train_loss = []for index, batch_data in enumerate(train_data):if cuda_flag:batch_data = [d.cuda() for d in batch_data]optimizer.zero_grad()input_ids, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况loss = model(input_ids, labels)loss.backward()optimizer.step()train_loss.append(loss.item())if index % int(len(train_data) / 2) == 0:logger.info("batch loss %f" % loss)logger.info("epoch average loss: %f" % np.mean(train_loss))acc = evaluator.eval(epoch)# model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)# torch.save(model.state_dict(), model_path) #保存模型权重return accif __name__ == "__main__":main(Config)# for model in ["cnn"]:# Config["model_type"] = model# print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"])#对比所有模型#中间日志可以关掉,避免输出过多信息# 超参数的网格搜索for model in ["gated_cnn", 'bert', 'lstm']:Config["model_type"] = modelfor lr in [1e-3, 1e-4]:Config["learning_rate"] = lrfor hidden_size in [128]:Config["hidden_size"] = hidden_sizefor batch_size in [64, 128]:Config["batch_size"] = batch_sizefor pooling_style in ["avg", 'max']:Config["pooling_style"] = pooling_styleprint("最后一轮准确率:", main(Config), "当前配置:", Config)
model:
# -*- coding: utf-8 -*-import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from transformers import BertModel
"""
建立网络模型结构
"""class TorchModel(nn.Module):def __init__(self, config):super(TorchModel, self).__init__()hidden_size = config["hidden_size"]vocab_size = config["vocab_size"] + 1class_num = config["class_num"]model_type = config["model_type"]num_layers = config["num_layers"]self.use_bert = Falseself.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)if model_type == "fast_text":self.encoder = lambda x: xelif model_type == "lstm":self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, batch_first=True)elif model_type == "gru":self.encoder = nn.GRU(hidden_size, hidden_size, num_layers=num_layers, batch_first=True)elif model_type == "rnn":self.encoder = nn.RNN(hidden_size, hidden_size, num_layers=num_layers, batch_first=True)elif model_type == "cnn":self.encoder = CNN(config)elif model_type == "gated_cnn":self.encoder = GatedCNN(config)elif model_type == "stack_gated_cnn":self.encoder = StackGatedCNN(config)elif model_type == "rcnn":self.encoder = RCNN(config)elif model_type == "bert":self.use_bert = Trueself.encoder = BertModel.from_pretrained(config["pretrain_model_path"], return_dict=False)hidden_size = self.encoder.config.hidden_sizeelif model_type == "bert_lstm":self.use_bert = Trueself.encoder = BertLSTM(config)hidden_size = self.encoder.bert.config.hidden_sizeelif model_type == "bert_cnn":self.use_bert = Trueself.encoder = BertCNN(config)hidden_size = self.encoder.bert.config.hidden_sizeelif model_type == "bert_mid_layer":self.use_bert = Trueself.encoder = BertMidLayer(config)hidden_size = self.encoder.bert.config.hidden_sizeself.classify = nn.Linear(hidden_size, class_num)self.pooling_style = config["pooling_style"]self.loss = nn.functional.cross_entropy #loss采用交叉熵损失#当输入真实标签,返回loss值;无真实标签,返回预测值def forward(self, x, target=None):if self.use_bert: # bert返回的结果是 (sequence_output, pooler_output)#sequence_output:batch_size, max_len, hidden_size#pooler_output:batch_size, hidden_sizex = self.encoder(x)else:x = self.embedding(x) # input shape:(batch_size, sen_len)x = self.encoder(x) # input shape:(batch_size, sen_len, input_dim)if isinstance(x, tuple): #RNN类的模型会同时返回隐单元向量,我们只取序列结果x = x[0]#可以采用pooling的方式得到句向量if self.pooling_style == "max":self.pooling_layer = nn.MaxPool1d(x.shape[1])else:self.pooling_layer = nn.AvgPool1d(x.shape[1])x = self.pooling_layer(x.transpose(1, 2)).squeeze() #input shape:(batch_size, sen_len, input_dim)#也可以直接使用序列最后一个位置的向量# x = x[:, -1, :]predict = self.classify(x) #input shape:(batch_size, input_dim)if target is not None:return self.loss(predict, target.squeeze())else:return predictclass CNN(nn.Module):def __init__(self, config):super(CNN, self).__init__()hidden_size = config["hidden_size"]kernel_size = config["kernel_size"]pad = int((kernel_size - 1)/2)self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)def forward(self, x): #x : (batch_size, max_len, embeding_size)return self.cnn(x.transpose(1, 2)).transpose(1, 2)class GatedCNN(nn.Module):def __init__(self, config):super(GatedCNN, self).__init__()self.cnn = CNN(config)self.gate = CNN(config)def forward(self, x):a = self.cnn(x)b = self.gate(x)b = torch.sigmoid(b)return torch.mul(a, b)class StackGatedCNN(nn.Module):def __init__(self, config):super(StackGatedCNN, self).__init__()self.num_layers = config["num_layers"]self.hidden_size = config["hidden_size"]#ModuleList类内可以放置多个模型,取用时类似于一个列表self.gcnn_layers = nn.ModuleList(GatedCNN(config) for i in range(self.num_layers))self.ff_liner_layers1 = nn.ModuleList(nn.Linear(self.hidden_size, self.hidden_size) for i in range(self.num_layers))self.ff_liner_layers2 = nn.ModuleList(nn.Linear(self.hidden_size, self.hidden_size) for i in range(self.num_layers))self.bn_after_gcnn = nn.ModuleList(nn.LayerNorm(self.hidden_size) for i in range(self.num_layers))self.bn_after_ff = nn.ModuleList(nn.LayerNorm(self.hidden_size) for i in range(self.num_layers))def forward(self, x):#仿照bert的transformer模型结构,将self-attention替换为gcnnfor i in range(self.num_layers):gcnn_x = self.gcnn_layers[i](x)x = gcnn_x + x #通过gcnn+残差x = self.bn_after_gcnn[i](x) #之后bn# # 仿照feed-forward层,使用两个线性层l1 = self.ff_liner_layers1[i](x) #一层线性l1 = torch.relu(l1) #在bert中这里是gelul2 = self.ff_liner_layers2[i](l1) #二层线性x = self.bn_after_ff[i](x + l2) #残差后过bnreturn xclass RCNN(nn.Module):def __init__(self, config):super(RCNN, self).__init__()hidden_size = config["hidden_size"]self.rnn = nn.RNN(hidden_size, hidden_size)self.cnn = GatedCNN(config)def forward(self, x):x, _ = self.rnn(x)x = self.cnn(x)return xclass BertLSTM(nn.Module):def __init__(self, config):super(BertLSTM, self).__init__()self.bert = BertModel.from_pretrained(config["pretrain_model_path"], return_dict=False)self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)def forward(self, x):x = self.bert(x)[0]x, _ = self.rnn(x)return xclass BertCNN(nn.Module):def __init__(self, config):super(BertCNN, self).__init__()self.bert = BertModel.from_pretrained(config["pretrain_model_path"], return_dict=False)config["hidden_size"] = self.bert.config.hidden_sizeself.cnn = CNN(config)def forward(self, x):x = self.bert(x)[0]x = self.cnn(x)return xclass BertMidLayer(nn.Module):def __init__(self, config):super(BertMidLayer, self).__init__()self.bert = BertModel.from_pretrained(config["pretrain_model_path"], return_dict=False)self.bert.config.output_hidden_states = Truedef forward(self, x):layer_states = self.bert(x)[2]#(13, batch, len, hidden)layer_states = torch.add(layer_states[-2], layer_states[-1])return layer_states#优化器的选择
def choose_optimizer(config, model):optimizer = config["optimizer"]learning_rate = config["learning_rate"]if optimizer == "adam":return Adam(model.parameters(), lr=learning_rate)elif optimizer == "sgd":return SGD(model.parameters(), lr=learning_rate)if __name__ == "__main__":from config import Config# Config["class_num"] = 3# Config["vocab_size"] = 20# Config["max_length"] = 5Config["model_type"] = "bert"model = BertModel.from_pretrained(Config["pretrain_model_path"], return_dict=False)x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]])sequence_output, pooler_output = model(x)print(x[2], type(x[2]), len(x[2]))# model = TorchModel(Config)# label = torch.LongTensor([1,2])# print(model(x, label))