在Keras中用Bert進行情感分析


之前在BERT實戰——基於Keras一文中介紹了兩個庫 keras_bert 和 bert4keras

但是由於 bert4keras 處於開發階段,有些函數名稱和位置等等發生了變化,那篇文章只用了 bert4keras 進行情感分析

於是這里新開了一篇文章將 2 個庫都用一遍, bert4keras 也使用最新版本

本文所用bert4keras時間:2019-11-09

害怕 bert4keras 后續繼續變化,需要穩定的可以先采用 keras_bert 

數據集:

原始Github鏈接:https://github.com/bojone/bert4keras/tree/master/examples/datasets

個人網盤:鏈接: https://pan.baidu.com/s/1OAhNbRYpU1HW25_vChdRng 提取碼: uxax 

使用keras_bert

配置一些超參數,導入需要的包和設置文件路徑

import json
import numpy as np
import pandas as pdfrom keras_bert import load_trained_model_from_checkpoint, Tokenizer
# 超參數 maxlen = 100 batch_size = 16 droup_out_rate = 0.5 learning_rate = 1e-5 epochs = 15 path_prefix = "./test" # 預訓練模型目錄 config_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_config.json" checkpoint_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_model.ckpt" dict_path = path_prefix + "/chinese_L-12_H-768_A-12/vocab.txt"

讀取數據和構造訓練樣本

# 讀取數據
neg = pd.read_excel(path_prefix + "/data/neg.xls", header=None)
pos = pd.read_excel(path_prefix + "/data/pos.xls", header=None)

# 構建訓練數據
data = []

for d in neg[0]:
    data.append((d, 0))

for d in pos[0]:
    data.append((d, 1))

讀取字典

# 讀取字典
token_dict = load_vocabulary(dict_path)
# 建立分詞器
tokenizer = Tokenizer(token_dict)

拆分為訓練集和測試集

# 按照9:1的比例划分訓練集和驗證集
random_order = list(range(len(data)))
np.random.shuffle(random_order)
train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]

序列padding 和 訓練用的生成器

def seq_padding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([
        np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
    ])


class data_generator:
    def __init__(self, data, batch_size=batch_size):
        self.data = data
        self.batch_size = batch_size
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1
    def __len__(self):
        return self.steps
    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))
            np.random.shuffle(idxs)
            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:
                    X1 = seq_padding(X1)
                    X2 = seq_padding(X2)
                    Y = seq_padding(Y)
                    yield [X1, X2], Y
                    [X1, X2, Y] = [], [], []

讀取 bert 模型並增加一個全連接層用於預測

from keras.layers import *
from keras.models import Model
import keras.backend as K
from keras.optimizers import Adam

# trainable設置True對Bert進行微調
# 默認不對Bert模型進行調參
bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, , trainable=True)

x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))

x = bert_model([x1_in, x2_in])
x = Lambda(lambda x: x[:, 0])(x)
x = Dropout(droup_out_rate)(x)
p = Dense(1, activation='sigmoid')(x)

model = Model([x1_in, x2_in], p)
model.compile(
    loss='binary_crossentropy',
    optimizer=Adam(learning_rate),
    metrics=['accuracy']
)
model.summary()

開始訓練

train_D = data_generator(train_data)
valid_D = data_generator(valid_data)

model.fit_generator(
    train_D.__iter__(),
    steps_per_epoch=len(train_D),
    epochs=epochs,
    validation_data=valid_D.__iter__(),
    validation_steps=len(valid_D)
)

使用bert4keras

為防止 bert4keras 又調整,導致代碼和最新版本不適配,這里記錄更新時間

更新時間:2019-11-09

配置超參數,導入需要的包和設置預訓練模型的路徑

import json
import numpy as np
import pandas as pd
import os
from bert4keras.bert import build_bert_model
from bert4keras.backend import set_gelu
from bert4keras.utils import Tokenizer, load_vocab
set_gelu('tanh') # 切換gelu版本

#超參數
maxlen = 100
batch_size = 16
droup_out_rate = 0.5
learning_rate = 1e-5
epochs = 15
path_prefix = "./test"
# 預訓練模型路徑
config_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_config.json"
checkpoint_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_model.ckpt"
dict_path = path_prefix + "/chinese_L-12_H-768_A-12/vocab.txt"

讀取數據和構造訓練樣本

# 讀取數據
neg = pd.read_excel(path_prefix + "/data/neg.xls", header=None)
pos = pd.read_excel(path_prefix + "/data/pos.xls", header=None)

data, tokens = [], {}
# 讀取詞典
_token_dict = load_vocab(dict_path)
# 建立臨時分詞器
_tokenizer = Tokenizer(_token_dict)

for d in neg[0]:
    data.append((d, 0))
    for t in _tokenizer.tokenize(d):
        tokens[t] = tokens.get(t, 0) + 1

for d in pos[0]:
    data.append((d, 1))
    for t in _tokenizer.tokenize(d):
        tokens[t] = tokens.get(t, 0) + 1

精簡字典,只留下本任務用到的字

tokens = {i: j for i, j in tokens.items() if j >= 4}
# token_dict是本任務需要用到的字
# keep_words是在bert中保留的字表
token_dict, keep_words = {}, []

for t in ['[PAD]', '[UNK]', '[CLS]', '[SEP]']:
    token_dict[t] = len(token_dict)
    keep_words.append(_token_dict[t])

for t in tokens:
    if t in _token_dict and t not in token_dict:
        token_dict[t] = len(token_dict)
        keep_words.append(_token_dict[t])

# 建立分詞器
tokenizer = Tokenizer(token_dict)

拆分訓練集和測試集

if not os.path.exists('./random_order.json'):
    random_order = list(range(len(data)))
    np.random.shuffle(random_order)
    json.dump(
        random_order,
        open('./random_order.json', 'w'),
        indent=4
    )
else:
    random_order = json.load(open('./random_order.json'))


# 按照9:1的比例划分訓練集和驗證集
train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]

padding和生成器

def seq_padding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([
        np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
    ])


class data_generator:
    def __init__(self, data, batch_size=batch_size):
        self.data = data
        self.batch_size = batch_size
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1
    def __len__(self):
        return self.steps
    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))
            np.random.shuffle(idxs)
            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = tokenizer.encode(text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:
                    X1 = seq_padding(X1)
                    X2 = seq_padding(X2)
                    Y = seq_padding(Y)
                    yield [X1, X2], Y
                    [X1, X2, Y] = [], [], []

讀取 bert 模型並增加一個全連接層用於預測

from keras.layers import *
from keras.models import Model
import keras.backend as K
from keras.optimizers import Adam

model = build_bert_model(
    config_path,
    checkpoint_path,
    # 只保留keep_words中的字,精簡原字表
    keep_words=keep_words,
)

output = Lambda(lambda x: x[:, 0])(model.output)
output = Dropout(droup_out_rate)(output)
output = Dense(1, activation='sigmoid')(output)
model = Model(model.input, output)

model.compile(
    loss='binary_crossentropy',
    optimizer=Adam(learning_rate),
    metrics=['accuracy']
)
model.summary()

開始訓練

train_D = data_generator(train_data)
valid_D = data_generator(valid_data)

model.fit_generator(
    train_D.__iter__(),
    steps_per_epoch=len(train_D),
    epochs=epochs,
    validation_data=valid_D.__iter__(),
    validation_steps=len(valid_D)
)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM