tensorflow2實現BiLSTM+CRF中文命名實體識別


利用tensorflow2自帶keras搭建BiLSTM+CRF的序列標注模型,完成中文的命名實體識別任務。這里使用數據集是提前處理過的,已經轉成命名實體識別需要的“BIO”標注格式。

詳細代碼和數據:https://github.com/huanghao128/zh-nlp-demo

模型結構

BiLSTM+CRF模型就是在雙向LSTM模型的輸出位置接上一個CRF層,這樣可以學習到相鄰輸出之間的依賴關系,從而提高輸出標簽的整理准確率,模型結構圖如下圖:
BiLSTM+CRF模型結構

數據處理

本文使用的數據是已經預處理過的,所以直接加載數據就好了,首先我們要加載字符詞典文件,還有BIO標記類別的索引化。其中BIO標記中B-PER和I-PER表示人名,B-LOC和I-LOC表示地名,B-ORG和I-ORG表示機構名。

char_vocab_path = "./data/char_vocabs.txt" # 字典文件
train_data_path = "./data/train_data" # 訓練數據
test_data_path = "./data/test_data" # 測試數據

special_words = ['<PAD>', '<UNK>'] # 特殊詞表示

# "BIO"標記的標簽
label2idx = {"O": 0,
             "B-PER": 1, "I-PER": 2,
             "B-LOC": 3, "I-LOC": 4,
             "B-ORG": 5, "I-ORG": 6
             }
# 索引和BIO標簽對應
idx2label = {idx: label for label, idx in label2idx.items()}

# 讀取字符詞典文件
with open(char_vocab_path, "r", encoding="utf8") as fo:
    char_vocabs = [line.strip() for line in fo]
char_vocabs = special_words + char_vocabs

# 字符和索引編號對應
idx2vocab = {idx: char for idx, char in enumerate(char_vocabs)}
vocab2idx = {char: idx for idx, char in idx2vocab.items()}

然后加載訓練和測試集,並把原始數據和BIO標記轉成索引和類別編號。

# 讀取數據集語料
def read_corpus(corpus_path, vocab2idx, label2idx):
    datas, labels = [], []
    with open(corpus_path, encoding='utf-8') as fr:
        lines = fr.readlines()
    sent_, tag_ = [], []
    for line in lines:
        if line != '\n':
            char, label = line.strip().split()
            sent_.append(char)
            tag_.append(label)
        else:
            sent_ids = [vocab2idx[char] if char in vocab2idx else vocab2idx['<UNK>'] for char in sent_]
            tag_ids = [label2idx[label] if label in label2idx else 0 for label in tag_]
            datas.append(sent_ids)
            labels.append(tag_ids)
            sent_, tag_ = [], []
    return datas, labels
# 加載訓練集
train_datas, train_labels = read_corpus(train_data_path, vocab2idx, label2idx)
# 加載測試集
test_datas, test_labels = read_corpus(test_data_path, vocab2idx, label2idx)

數據的填充,以及類別的one-hot編碼。

from tensorflow import keras
from keras.preprocessing import sequence
import numpy as np

MAX_LEN = 100
VOCAB_SIZE = len(vocab2idx)
CLASS_NUMS = len(label2idx)

# padding data
train_datas = sequence.pad_sequences(train_datas, maxlen=MAX_LEN)
train_labels = sequence.pad_sequences(train_labels, maxlen=MAX_LEN)
test_datas = sequence.pad_sequences(test_datas, maxlen=MAX_LEN)
test_labels = sequence.pad_sequences(test_labels, maxlen=MAX_LEN)
print('x_train shape:', train_datas.shape)
print('x_test shape:', test_datas.shape)
# encoder one-hot
train_labels = keras.utils.to_categorical(train_labels, CLASS_NUMS)
test_labels = keras.utils.to_categorical(test_labels, CLASS_NUMS)
print('trainlabels shape:', train_labels.shape)
print('testlabels shape:', test_labels.shape)

模型構建和訓練

模型構建主要使用keras自帶的基礎模型組裝,首先是雙向LSTM模型,然后輸出接CRF模型,輸出對每個時刻的分類。

from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras import backend as K

class CRF(layers.Layer):
    def __init__(self, label_size):
        super(CRF, self).__init__()
        self.trans_params = tf.Variable(
            tf.random.uniform(shape=(label_size, label_size)), name="transition")
    
    @tf.function
    def call(self, inputs, labels, seq_lens):
        log_likelihood, self.trans_params = tfa.text.crf_log_likelihood(
                                                inputs, labels, seq_lens,
                                                transition_params=self.trans_params)
        loss = tf.reduce_sum(-log_likelihood)
        return loss
EPOCHS = 20
BATCH_SIZE = 64
EMBED_DIM = 128
HIDDEN_SIZE = 64
MAX_LEN = 100
VOCAB_SIZE = len(vocab2idx)
CLASS_NUMS = len(label2idx)

inputs = layers.Input(shape=(MAX_LEN,), name='input_ids', dtype='int32')
targets = layers.Input(shape=(MAX_LEN,), name='target_ids', dtype='int32')
seq_lens = layers.Input(shape=(), name='input_lens', dtype='int32')

x = layers.Embedding(input_dim=VOCAB_SIZE, output_dim=EMBED_DIM, mask_zero=True)(inputs)
x = layers.Bidirectional(layers.LSTM(HIDDEN_SIZE, return_sequences=True))(x)
logits = layers.Dense(CLASS_NUMS)(x)
loss = CRF(label_size=CLASS_NUMS)(logits, targets, seq_lens)

model = models.Model(inputs=[inputs, targets, seq_lens], outputs=loss)

print(model.summary())

模型層次結構

模型訓練

由於構建模型時,輸出對應的是loss,所以loss定義需要進行一些修改。

class CustomLoss(keras.losses.Loss):
    def call(self, y_true, y_pred):
        loss, pred = y_pred
        return loss

# 自定義Loss
# model.compile(loss=CustomLoss(), optimizer='adam')
# 或者使用lambda表達式
model.compile(loss=lambda y_true, y_pred: y_pred, optimizer='adam')

# 訓練
model.fit(x=[train_datas, train_labels, train_seq_lens], y=labels, 
        validation_split=0.1, batch_size=BATCH_SIZE, epochs=EPOCHS)

# 保存
model.save("output/bilstm_crf_ner")

結果預測

結果預測是我們訓練好模型后,重新加載模型,輸入新的要預測文本,然后識別出文本中的命名實體。這里首先要加載字符詞典,然后加載模型,之后對輸入文本預處理成字符序列,然后模型預測每個時刻的輸出類別,最后把類別轉成BIO標記,BIO標記組合成正確的命名實體。

# 加載模型
model = models.load_model("output/bilstm_crf_ner", compile=False)
# 如果需要繼續訓練,需要下面的重新compile
# model.compile(loss=lambda y_true, y_pred: y_pred, optimizer='adam')

分別提取轉移矩陣參數和BiLSTM的輸出

# 提取轉移矩陣參數
trans_params = model.get_layer('crf').get_weights()[0]

# 獲得BiLSTM的輸出logits
sub_model = models.Model(inputs=model.get_layer('input_ids').input,
                        outputs=model.get_layer('dense').output)

def predict(model, inputs, input_lens):
    logits = sub_model.predict(inputs)
    # 獲取CRF層的轉移矩陣
    # crf_decode:viterbi解碼獲得結果
    pred_seq, viterbi_score = tfa.text.crf_decode(logits, trans_params, input_lens)
    return pred_seq

測試預測數據

maxlen = 100
sentence = "這里輸入是需要實體識別的句子"
sent_chars = list(sentence)
sent2id = [vocab2idx[word] if word in vocab2idx else vocab2idx['<UNK>'] for word in sent_chars]
sent2id_new = np.array([[0] * (maxlen-len(sent2id)) + sent2id[:maxlen]])
test_lens = np.array([100])

pred_seq = predict(model, sent2id_new, test_lens)
print(pred_seq)

對預測結果進行命名實體解析和提取

def get_valid_nertag(input_data, result_tags):
    result_words = []
    start, end =0, 1 # 實體開始結束位置標識
    tag_label = "O" # 實體類型標識
    for i, tag in enumerate(result_tags):
        if tag.startswith("B"):
            if tag_label != "O": # 當前實體tag之前有其他實體
                result_words.append((input_data[start: end], tag_label)) # 獲取實體
            tag_label = tag.split("-")[1] # 獲取當前實體類型
            start, end = i, i+1 # 開始和結束位置變更
        elif tag.startswith("I"):
            temp_label = tag.split("-")[1]
            if temp_label == tag_label: # 當前實體tag是之前實體的一部分
                end += 1 # 結束位置end擴展
        elif tag == "O":
            if tag_label != "O": # 當前位置非實體 但是之前有實體
                result_words.append((input_data[start: end], tag_label)) # 獲取實體
                tag_label = "O"  # 實體類型置"O"
            start, end = i, i+1 # 開始和結束位置變更
    if tag_label != "O": # 最后結尾還有實體
        result_words.append((input_data[start: end], tag_label)) # 獲取結尾的實體
    return result_words

詳細代碼和數據:https://github.com/huanghao128/zh-nlp-demo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM