之前在BERT實戰——基於Keras一文中介紹了兩個庫 keras_bert 和 bert4keras
但是由於 bert4keras 處於開發階段,有些函數名稱和位置等等發生了變化,那篇文章只用了 bert4keras 進行情感分析
於是這里新開了一篇文章將 2 個庫都用一遍, bert4keras 也使用最新版本
本文所用bert4keras時間:2019-11-09
害怕 bert4keras 后續繼續變化,需要穩定的可以先采用 keras_bert
數據集:
原始Github鏈接:https://github.com/bojone/bert4keras/tree/master/examples/datasets
個人網盤:鏈接: https://pan.baidu.com/s/1OAhNbRYpU1HW25_vChdRng 提取碼: uxax
使用keras_bert
配置一些超參數,導入需要的包和設置文件路徑
import json import numpy as np import pandas as pdfrom keras_bert import load_trained_model_from_checkpoint, Tokenizer
# 超參數 maxlen = 100 batch_size = 16 droup_out_rate = 0.5 learning_rate = 1e-5 epochs = 15 path_prefix = "./test" # 預訓練模型目錄 config_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_config.json" checkpoint_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_model.ckpt" dict_path = path_prefix + "/chinese_L-12_H-768_A-12/vocab.txt"
讀取數據和構造訓練樣本
# 讀取數據 neg = pd.read_excel(path_prefix + "/data/neg.xls", header=None) pos = pd.read_excel(path_prefix + "/data/pos.xls", header=None) # 構建訓練數據 data = [] for d in neg[0]: data.append((d, 0)) for d in pos[0]: data.append((d, 1))
讀取字典
# 讀取字典 token_dict = load_vocabulary(dict_path) # 建立分詞器 tokenizer = Tokenizer(token_dict)
拆分為訓練集和測試集
# 按照9:1的比例划分訓練集和驗證集 random_order = list(range(len(data))) np.random.shuffle(random_order) train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0] valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]
序列padding 和 訓練用的生成器
def seq_padding(X, padding=0): L = [len(x) for x in X] ML = max(L) return np.array([ np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X ]) class data_generator: def __init__(self, data, batch_size=batch_size): self.data = data self.batch_size = batch_size self.steps = len(self.data) // self.batch_size if len(self.data) % self.batch_size != 0: self.steps += 1 def __len__(self): return self.steps def __iter__(self): while True: idxs = list(range(len(self.data))) np.random.shuffle(idxs) X1, X2, Y = [], [], [] for i in idxs: d = self.data[i] text = d[0][:maxlen] x1, x2 = tokenizer.encode(first=text) y = d[1] X1.append(x1) X2.append(x2) Y.append([y]) if len(X1) == self.batch_size or i == idxs[-1]: X1 = seq_padding(X1) X2 = seq_padding(X2) Y = seq_padding(Y) yield [X1, X2], Y [X1, X2, Y] = [], [], []
讀取 bert 模型並增加一個全連接層用於預測
from keras.layers import * from keras.models import Model import keras.backend as K from keras.optimizers import Adam # trainable設置True對Bert進行微調 # 默認不對Bert模型進行調參 bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, , trainable=True) x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) x = bert_model([x1_in, x2_in]) x = Lambda(lambda x: x[:, 0])(x) x = Dropout(droup_out_rate)(x) p = Dense(1, activation='sigmoid')(x) model = Model([x1_in, x2_in], p) model.compile( loss='binary_crossentropy', optimizer=Adam(learning_rate), metrics=['accuracy'] ) model.summary()
開始訓練
train_D = data_generator(train_data) valid_D = data_generator(valid_data) model.fit_generator( train_D.__iter__(), steps_per_epoch=len(train_D), epochs=epochs, validation_data=valid_D.__iter__(), validation_steps=len(valid_D) )
使用bert4keras
為防止 bert4keras 又調整,導致代碼和最新版本不適配,這里記錄更新時間
更新時間:2019-11-09
配置超參數,導入需要的包和設置預訓練模型的路徑
import json import numpy as np import pandas as pd import os from bert4keras.bert import build_bert_model from bert4keras.backend import set_gelu from bert4keras.utils import Tokenizer, load_vocab set_gelu('tanh') # 切換gelu版本 #超參數 maxlen = 100 batch_size = 16 droup_out_rate = 0.5 learning_rate = 1e-5 epochs = 15 path_prefix = "./test" # 預訓練模型路徑 config_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_config.json" checkpoint_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_model.ckpt" dict_path = path_prefix + "/chinese_L-12_H-768_A-12/vocab.txt"
讀取數據和構造訓練樣本
# 讀取數據 neg = pd.read_excel(path_prefix + "/data/neg.xls", header=None) pos = pd.read_excel(path_prefix + "/data/pos.xls", header=None) data, tokens = [], {} # 讀取詞典 _token_dict = load_vocab(dict_path) # 建立臨時分詞器 _tokenizer = Tokenizer(_token_dict) for d in neg[0]: data.append((d, 0)) for t in _tokenizer.tokenize(d): tokens[t] = tokens.get(t, 0) + 1 for d in pos[0]: data.append((d, 1)) for t in _tokenizer.tokenize(d): tokens[t] = tokens.get(t, 0) + 1
精簡字典,只留下本任務用到的字
tokens = {i: j for i, j in tokens.items() if j >= 4} # token_dict是本任務需要用到的字 # keep_words是在bert中保留的字表 token_dict, keep_words = {}, [] for t in ['[PAD]', '[UNK]', '[CLS]', '[SEP]']: token_dict[t] = len(token_dict) keep_words.append(_token_dict[t]) for t in tokens: if t in _token_dict and t not in token_dict: token_dict[t] = len(token_dict) keep_words.append(_token_dict[t]) # 建立分詞器 tokenizer = Tokenizer(token_dict)
拆分訓練集和測試集
if not os.path.exists('./random_order.json'): random_order = list(range(len(data))) np.random.shuffle(random_order) json.dump( random_order, open('./random_order.json', 'w'), indent=4 ) else: random_order = json.load(open('./random_order.json')) # 按照9:1的比例划分訓練集和驗證集 train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0] valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]
padding和生成器
def seq_padding(X, padding=0): L = [len(x) for x in X] ML = max(L) return np.array([ np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X ]) class data_generator: def __init__(self, data, batch_size=batch_size): self.data = data self.batch_size = batch_size self.steps = len(self.data) // self.batch_size if len(self.data) % self.batch_size != 0: self.steps += 1 def __len__(self): return self.steps def __iter__(self): while True: idxs = list(range(len(self.data))) np.random.shuffle(idxs) X1, X2, Y = [], [], [] for i in idxs: d = self.data[i] text = d[0][:maxlen] x1, x2 = tokenizer.encode(text) y = d[1] X1.append(x1) X2.append(x2) Y.append([y]) if len(X1) == self.batch_size or i == idxs[-1]: X1 = seq_padding(X1) X2 = seq_padding(X2) Y = seq_padding(Y) yield [X1, X2], Y [X1, X2, Y] = [], [], []
讀取 bert 模型並增加一個全連接層用於預測
from keras.layers import * from keras.models import Model import keras.backend as K from keras.optimizers import Adam model = build_bert_model( config_path, checkpoint_path, # 只保留keep_words中的字,精簡原字表 keep_words=keep_words, ) output = Lambda(lambda x: x[:, 0])(model.output) output = Dropout(droup_out_rate)(output) output = Dense(1, activation='sigmoid')(output) model = Model(model.input, output) model.compile( loss='binary_crossentropy', optimizer=Adam(learning_rate), metrics=['accuracy'] ) model.summary()
開始訓練
train_D = data_generator(train_data) valid_D = data_generator(valid_data) model.fit_generator( train_D.__iter__(), steps_per_epoch=len(train_D), epochs=epochs, validation_data=valid_D.__iter__(), validation_steps=len(valid_D) )