利用tensorflow2自帶keras搭建BiLSTM+CRF的序列標注模型,完成中文的命名實體識別任務。這里使用數據集是提前處理過的,已經轉成命名實體識別需要的“BIO”標注格式。
詳細代碼和數據:https://github.com/huanghao128/zh-nlp-demo
模型結構
BiLSTM+CRF模型就是在雙向LSTM模型的輸出位置接上一個CRF層,這樣可以學習到相鄰輸出之間的依賴關系,從而提高輸出標簽的整理准確率,模型結構圖如下圖:
數據處理
本文使用的數據是已經預處理過的,所以直接加載數據就好了,首先我們要加載字符詞典文件,還有BIO標記類別的索引化。其中BIO標記中B-PER和I-PER表示人名,B-LOC和I-LOC表示地名,B-ORG和I-ORG表示機構名。
char_vocab_path = "./data/char_vocabs.txt" # 字典文件
train_data_path = "./data/train_data" # 訓練數據
test_data_path = "./data/test_data" # 測試數據
special_words = ['<PAD>', '<UNK>'] # 特殊詞表示
# "BIO"標記的標簽
label2idx = {"O": 0,
"B-PER": 1, "I-PER": 2,
"B-LOC": 3, "I-LOC": 4,
"B-ORG": 5, "I-ORG": 6
}
# 索引和BIO標簽對應
idx2label = {idx: label for label, idx in label2idx.items()}
# 讀取字符詞典文件
with open(char_vocab_path, "r", encoding="utf8") as fo:
char_vocabs = [line.strip() for line in fo]
char_vocabs = special_words + char_vocabs
# 字符和索引編號對應
idx2vocab = {idx: char for idx, char in enumerate(char_vocabs)}
vocab2idx = {char: idx for idx, char in idx2vocab.items()}
然后加載訓練和測試集,並把原始數據和BIO標記轉成索引和類別編號。
# 讀取數據集語料
def read_corpus(corpus_path, vocab2idx, label2idx):
datas, labels = [], []
with open(corpus_path, encoding='utf-8') as fr:
lines = fr.readlines()
sent_, tag_ = [], []
for line in lines:
if line != '\n':
char, label = line.strip().split()
sent_.append(char)
tag_.append(label)
else:
sent_ids = [vocab2idx[char] if char in vocab2idx else vocab2idx['<UNK>'] for char in sent_]
tag_ids = [label2idx[label] if label in label2idx else 0 for label in tag_]
datas.append(sent_ids)
labels.append(tag_ids)
sent_, tag_ = [], []
return datas, labels
# 加載訓練集
train_datas, train_labels = read_corpus(train_data_path, vocab2idx, label2idx)
# 加載測試集
test_datas, test_labels = read_corpus(test_data_path, vocab2idx, label2idx)
數據的填充,以及類別的one-hot編碼。
from tensorflow import keras
from keras.preprocessing import sequence
import numpy as np
MAX_LEN = 100
VOCAB_SIZE = len(vocab2idx)
CLASS_NUMS = len(label2idx)
# padding data
train_datas = sequence.pad_sequences(train_datas, maxlen=MAX_LEN)
train_labels = sequence.pad_sequences(train_labels, maxlen=MAX_LEN)
test_datas = sequence.pad_sequences(test_datas, maxlen=MAX_LEN)
test_labels = sequence.pad_sequences(test_labels, maxlen=MAX_LEN)
print('x_train shape:', train_datas.shape)
print('x_test shape:', test_datas.shape)
# encoder one-hot
train_labels = keras.utils.to_categorical(train_labels, CLASS_NUMS)
test_labels = keras.utils.to_categorical(test_labels, CLASS_NUMS)
print('trainlabels shape:', train_labels.shape)
print('testlabels shape:', test_labels.shape)
模型構建和訓練
模型構建主要使用keras自帶的基礎模型組裝,首先是雙向LSTM模型,然后輸出接CRF模型,輸出對每個時刻的分類。
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras import backend as K
class CRF(layers.Layer):
def __init__(self, label_size):
super(CRF, self).__init__()
self.trans_params = tf.Variable(
tf.random.uniform(shape=(label_size, label_size)), name="transition")
@tf.function
def call(self, inputs, labels, seq_lens):
log_likelihood, self.trans_params = tfa.text.crf_log_likelihood(
inputs, labels, seq_lens,
transition_params=self.trans_params)
loss = tf.reduce_sum(-log_likelihood)
return loss
EPOCHS = 20
BATCH_SIZE = 64
EMBED_DIM = 128
HIDDEN_SIZE = 64
MAX_LEN = 100
VOCAB_SIZE = len(vocab2idx)
CLASS_NUMS = len(label2idx)
inputs = layers.Input(shape=(MAX_LEN,), name='input_ids', dtype='int32')
targets = layers.Input(shape=(MAX_LEN,), name='target_ids', dtype='int32')
seq_lens = layers.Input(shape=(), name='input_lens', dtype='int32')
x = layers.Embedding(input_dim=VOCAB_SIZE, output_dim=EMBED_DIM, mask_zero=True)(inputs)
x = layers.Bidirectional(layers.LSTM(HIDDEN_SIZE, return_sequences=True))(x)
logits = layers.Dense(CLASS_NUMS)(x)
loss = CRF(label_size=CLASS_NUMS)(logits, targets, seq_lens)
model = models.Model(inputs=[inputs, targets, seq_lens], outputs=loss)
print(model.summary())
模型訓練
由於構建模型時,輸出對應的是loss,所以loss定義需要進行一些修改。
class CustomLoss(keras.losses.Loss):
def call(self, y_true, y_pred):
loss, pred = y_pred
return loss
# 自定義Loss
# model.compile(loss=CustomLoss(), optimizer='adam')
# 或者使用lambda表達式
model.compile(loss=lambda y_true, y_pred: y_pred, optimizer='adam')
# 訓練
model.fit(x=[train_datas, train_labels, train_seq_lens], y=labels,
validation_split=0.1, batch_size=BATCH_SIZE, epochs=EPOCHS)
# 保存
model.save("output/bilstm_crf_ner")
結果預測
結果預測是我們訓練好模型后,重新加載模型,輸入新的要預測文本,然后識別出文本中的命名實體。這里首先要加載字符詞典,然后加載模型,之后對輸入文本預處理成字符序列,然后模型預測每個時刻的輸出類別,最后把類別轉成BIO標記,BIO標記組合成正確的命名實體。
# 加載模型
model = models.load_model("output/bilstm_crf_ner", compile=False)
# 如果需要繼續訓練,需要下面的重新compile
# model.compile(loss=lambda y_true, y_pred: y_pred, optimizer='adam')
分別提取轉移矩陣參數和BiLSTM的輸出
# 提取轉移矩陣參數
trans_params = model.get_layer('crf').get_weights()[0]
# 獲得BiLSTM的輸出logits
sub_model = models.Model(inputs=model.get_layer('input_ids').input,
outputs=model.get_layer('dense').output)
def predict(model, inputs, input_lens):
logits = sub_model.predict(inputs)
# 獲取CRF層的轉移矩陣
# crf_decode:viterbi解碼獲得結果
pred_seq, viterbi_score = tfa.text.crf_decode(logits, trans_params, input_lens)
return pred_seq
測試預測數據
maxlen = 100
sentence = "這里輸入是需要實體識別的句子"
sent_chars = list(sentence)
sent2id = [vocab2idx[word] if word in vocab2idx else vocab2idx['<UNK>'] for word in sent_chars]
sent2id_new = np.array([[0] * (maxlen-len(sent2id)) + sent2id[:maxlen]])
test_lens = np.array([100])
pred_seq = predict(model, sent2id_new, test_lens)
print(pred_seq)
對預測結果進行命名實體解析和提取
def get_valid_nertag(input_data, result_tags):
result_words = []
start, end =0, 1 # 實體開始結束位置標識
tag_label = "O" # 實體類型標識
for i, tag in enumerate(result_tags):
if tag.startswith("B"):
if tag_label != "O": # 當前實體tag之前有其他實體
result_words.append((input_data[start: end], tag_label)) # 獲取實體
tag_label = tag.split("-")[1] # 獲取當前實體類型
start, end = i, i+1 # 開始和結束位置變更
elif tag.startswith("I"):
temp_label = tag.split("-")[1]
if temp_label == tag_label: # 當前實體tag是之前實體的一部分
end += 1 # 結束位置end擴展
elif tag == "O":
if tag_label != "O": # 當前位置非實體 但是之前有實體
result_words.append((input_data[start: end], tag_label)) # 獲取實體
tag_label = "O" # 實體類型置"O"
start, end = i, i+1 # 開始和結束位置變更
if tag_label != "O": # 最后結尾還有實體
result_words.append((input_data[start: end], tag_label)) # 獲取結尾的實體
return result_words