今天Tony老師給大家帶來的案例是Kaggle上的Twitter的情感分析競賽。在這個案例中,將使用預訓練的模型BERT來完成對整個競賽的數據分析。
導入需要的庫
import numpy as np
import pandas as pd
from math import ceil, floor
import tensorflow as tf
import tensorflow.keras.layers as L
from tensorflow.keras.initializers import TruncatedNormal
from sklearn import model_selection
from transformers import BertConfig, TFBertPreTrainedModel, TFBertMainLayer
from tokenizers import BertWordPieceTokenizer
讀取並解釋數據
在競賽中,對數據的理解是非常關鍵的。因此我們首先要做的就是讀取數據,然后查看數據的內容以及特點。
先用pandas來讀取csv數據,
train_df = pd.read_csv('train.csv')
train_df.dropna(inplace=True)
test_df = pd.read_csv('test.csv')
test_df.loc[:, "selected_text"] = test_df.text.values
submission_df = pd.read_csv('sample_submission.csv')
再查看下我們的數據的數量,我們一共有27485條訓練數據,3535條測試數據,
print("train numbers =", train_df.shape)
print("test numbers =", test_df.shape)
緊接着查看訓練數據和測試數據前10條表單的字段跟數據,表單中包含了一下幾個數據字段:
-
textID: 文本數據記錄的唯一ID;
-
text: 原始語句;
-
selected_text: 表示情感的語句;
-
sentiment: 情感類型, neutral中立, positive積極, negative消極;
從數據中我們可以得出,目標就是根據現有的情感從原本是的語句中選出能代表這個情感的語句部分。
train_df.head(10)
test_df.head(10)
定義常量
# bert預訓練權重跟數據存放的目錄
PATH = "./bert-base-uncased/"
# 語句最大長度
MAX_SEQUENCE_LENGTH = 128
載入詞向量
BERT是依據一個固定的詞向量來進行訓練的。因此在競賽中需要先使用BertWordPieceTokenizer來加載這些詞向量,其中的lowercase=True表示所有的詞向量都是小寫。設置大小寫不敏感可以減少模型對資源的占用。
TOKENIZER = BertWordPieceTokenizer(f"{PATH}/vocab.txt", lowercase=True)
定義數據加載器
定義數據預處理函數
def preprocess(tweet, selected_text, sentiment):
# 將被轉成byte string的原始字符串轉成utf-8的字符串
tweet = tweet.decode('utf-8')
selected_text = selected_text.decode('utf-8')
sentiment = sentiment.decode('utf-8')
tweet = " ".join(str(tweet).split())
selected_text = " ".join(str(selected_text).split())
# 標記出selected text和text共有的單詞
idx_start, idx_end = None, None
for index in (i for i, c in enumerate(tweet) if c == selected_text[0]):
if tweet[index:index+len(selected_text)] == selected_text:
idx_start = index
idx_end = index + len(selected_text)
break
intersection = [0] * len(tweet)
if idx_start != None and idx_end != None:
for char_idx in range(idx_start, idx_end):
intersection[char_idx] = 1
# 對原始數據用詞向量進行編碼, 這里會返回原始數據中的詞在詞向量中的下標
# 和原始數據中每個詞向量的單詞在文中的起始位置跟結束位置
enc = TOKENIZER.encode(tweet)
input_ids_orig, offsets = enc.ids, enc.offsets
target_idx = []
for i, (o1, o2) in enumerate(offsets):
if sum(intersection[o1: o2]) > 0:
target_idx.append(i)
target_start = target_idx[0]
target_end = target_idx[-1]
sentiment_map = {
'positive': 3893,
'negative': 4997,
'neutral': 8699,
}
# 將情感標簽和原始的語句的詞向量組合在一起組成我們新的數據
input_ids = [101] + [sentiment_map[sentiment]] + [102] + input_ids_orig + [102]
input_type_ids = [0] * (len(input_ids_orig) + 4)
attention_mask = [1] * (len(input_ids_orig) + 4)
offsets = [(0, 0), (0, 0), (0, 0)] + offsets + [(0, 0)]
target_start += 3
target_end += 3
# 計算需要paddning的長度, BERT是以固定長度進行輸入的,因此對於不足的我們需要做pandding
padding_length = MAX_SEQUENCE_LENGTH - len(input_ids)
if padding_length > 0:
input_ids = input_ids + ([0] * padding_length)
attention_mask = attention_mask + ([0] * padding_length)
input_type_ids = input_type_ids + ([0] * padding_length)
offsets = offsets + ([(0, 0)] * padding_length)
elif padding_length < 0:
pass
return (
input_ids, attention_mask, input_type_ids, offsets,
target_start, target_end, tweet, selected_text, sentiment,
)
定義數據加載器
class TweetDataset(tf.data.Dataset):
outputTypes = (
tf.dtypes.int32, tf.dtypes.int32, tf.dtypes.int32,
tf.dtypes.int32, tf.dtypes.float32, tf.dtypes.float32,
tf.dtypes.string, tf.dtypes.string, tf.dtypes.string,
)
outputShapes = (
(128,), (128,), (128,),
(128, 2), (), (),
(), (), (),
)
def _generator(tweet, selected_text, sentiment):
for tw, st, se in zip(tweet, selected_text, sentiment):
yield preprocess(tw, st, se)
def __new__(cls, tweet, selected_text, sentiment):
return tf.data.Dataset.from_generator(
cls._generator,
output_types=cls.outputTypes,
output_shapes=cls.outputShapes,
args=(tweet, selected_text, sentiment)
)
@staticmethod
def create(dataframe, batch_size, shuffle_buffer_size=-1):
dataset = TweetDataset(
dataframe.text.values,
dataframe.selected_text.values,
dataframe.sentiment.values
)
dataset = dataset.cache()
if shuffle_buffer_size != -1:
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
return dataset
定義模型
我們使用BERT模型來進行這次競賽,這里對BERT模型做一些簡單的介紹。
BERT的全稱是Bidirectional Encoder Representation from Transformers,即雙向Transformer的Encoder,因為decoder是不能獲要預測的信息的。
模型的主要創新點都在pre-train方法上,即用了Masked LM和Next Sentence Prediction兩種方法分別捕捉詞語和句子級別representation。
BERT主要特點如下:
-
使用了Transformer作為算法的主要框架,Trabsformer能更徹底的捕捉語句中的雙向關系;
-
使用了Mask Language Model 和 Next Sentence Prediction的多任務訓練目標;
-
使用更強大的機器訓練更大規模的數據,Google開源了BERT模型,我們可以直接使用BERT作為Word2Vec的轉換矩陣並高效的將其應用到自己的任務中。
BERT的本質是在海量的語料基礎上,運行自監督學習方法讓單詞學習得到一個較好的特征表示。
在之后特定任務中,可以直接使用BERT的特征表示作為該任務的詞嵌入特征。所以BERT提供的是一個供其它任務遷移學習的模型,該模型可以根據任務微調或者固定之后作為特征提取器。
在競賽中,我們定義了一個BertModel類,里面使用TFBertPreTrainedModel來進行推理。
BERT的輸出我們保存在hidden_states中,然后將這個得到的hidden_states結果在加入到Dense Layer,最后輸出我們需要提取的表示情感的文字的起始位置跟結束位置。
這兩個位置信息就是我們需要從原文中提取的詞向量的位置。
class BertModel(TFBertPreTrainedModel):
# drop out rate, 防止過擬合
dr = 0.1
# hidden state數量
hs = 2
def __init__(self, config, *inputs, **kwargs):
super().__init__(config, *inputs, **kwargs)
self.bert = TFBertMainLayer(config, name="bert")
self.concat = L.Concatenate()
self.dropout = L.Dropout(self.dr)
self.qa_outputs = L.Dense(
config.num_labels,
kernel_initializer=TruncatedNormal(stddev=config.initializer_range),
dtype='float32',
name="qa_outputs")
@tf.function
def call(self, inputs, **kwargs):
_, _, hidden_states = self.bert(inputs, **kwargs)
hidden_states = self.concat([
hidden_states[-i] for i in range(1, self.hs+1)
])
hidden_states = self.dropout(hidden_states, training=kwargs.get("training", False))
logits = self.qa_outputs(hidden_states)
start_logits, end_logits = tf.split(logits, 2, axis=-1)
start_logits = tf.squeeze(start_logits, axis=-1)
end_logits = tf.squeeze(end_logits, axis=-1)
return start_logits, end_logits
定義訓練函數
def train(model, dataset, loss_fn, optimizer):
@tf.function
def train_step(model, inputs, y_true, loss_fn, optimizer):
with tf.GradientTape() as tape:
y_pred = model(inputs, training=True)
loss = loss_fn(y_true[0], y_pred[0])
loss += loss_fn(y_true[1], y_pred[1])
scaled_loss = optimizer.get_scaled_loss(loss)
scaled_gradients = tape.gradient(scaled_loss, model.trainable_variables)
gradients = optimizer.get_unscaled_gradients(scaled_gradients)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss, y_pred
epoch_loss = 0.
for batch_num, sample in enumerate(dataset):
loss, y_pred = train_step(model, sample[:3], sample[4:6], loss_fn, optimizer)
epoch_loss += loss
print(
f"training ... batch {batch_num+1:03d} : "
f"train loss {epoch_loss/(batch_num+1):.3f} ",
end='\r')
定義預制函數
def predict(model, dataset, loss_fn, optimizer):
@tf.function
def predict_step(model, inputs):
return model(inputs)
def to_numpy(*args):
out = []
for arg in args:
if arg.dtype == tf.string:
arg = [s.decode('utf-8') for s in arg.numpy()]
out.append(arg)
else:
arg = arg.numpy()
out.append(arg)
return out
offset = tf.zeros([0, 128, 2], dtype=tf.dtypes.int32)
text = tf.zeros([0,], dtype=tf.dtypes.string)
selected_text = tf.zeros([0,], dtype=tf.dtypes.string)
sentiment = tf.zeros([0,], dtype=tf.dtypes.string)
pred_start = tf.zeros([0, 128], dtype=tf.dtypes.float32)
pred_end = tf.zeros([0, 128], dtype=tf.dtypes.float32)
for batch_num, sample in enumerate(dataset):
print(f"predicting ... batch {batch_num+1:03d}"+" "*20, end='\r')
y_pred = predict_step(model, sample[:3])
# add batch to accumulators
pred_start = tf.concat((pred_start, y_pred[0]), axis=0)
pred_end = tf.concat((pred_end, y_pred[1]), axis=0)
offset = tf.concat((offset, sample[3]), axis=0)
text = tf.concat((text, sample[6]), axis=0)
selected_text = tf.concat((selected_text, sample[7]), axis=0)
sentiment = tf.concat((sentiment, sample[8]), axis=0)
pred_start, pred_end, text, selected_text, sentiment, offset = \
to_numpy(pred_start, pred_end, text, selected_text, sentiment, offset)
return pred_start, pred_end, text, selected_text, sentiment, offset
判斷函數
這個競賽采用單詞級Jaccard系數,計算公式如下
Jaccard系數計算的是你預測的單詞在數據集中的個數,
def jaccard(str1, str2):
a = set(str1.lower().split())
b = set(str2.lower().split())
c = a.intersection(b)
return float(len(c)) / (len(a) + len(b) - len(c))
定義預測結果解碼函數
解碼函數通過模型預測拿到的start和end的index位置信息,然后和之前拿到的詞向量在樣本句子中的位置進行比較,將這個區間內的所有的單詞都提取出來作為我們的預測結果。
def decode_prediction(pred_start, pred_end, text, offset, sentiment):
def decode(pred_start, pred_end, text, offset):
decoded_text = ""
for i in range(pred_start, pred_end+1):
decoded_text += text[offset[i][0]:offset[i][1]]
if (i+1) < len(offset) and offset[i][1] < offset[i+1][0]:
decoded_text += " "
return decoded_text
decoded_predictions = []
for i in range(len(text)):
if sentiment[i] == "neutral" or len(text[i].split()) < 2:
decoded_text = text[i]
else:
idx_start = np.argmax(pred_start[i])
idx_end = np.argmax(pred_end[i])
if idx_start > idx_end:
idx_end = idx_start
decoded_text = str(decode(idx_start, idx_end, text[i], offset[i]))
if len(decoded_text) == 0:
decoded_text = text[i]
decoded_predictions.append(decoded_text)
return decoded_predictions
開始訓練
將訓練數據分成5個folds,每個folds訓練5個epoch,使用adam優化器,learning rate設置成3e-5,batch size使用32。
num_folds = 5
num_epochs = 5
batch_size = 32
learning_rate = 3e-5
optimizer = tf.keras.optimizers.Adam(learning_rate)
optimizer = tf.keras.mixed_precision.experimental.LossScaleOptimizer(
optimizer, 'dynamic')
config = BertConfig(output_hidden_states=True, num_labels=2)
model = BertModel.from_pretrained(PATH, config=config)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
kfold = model_selection.KFold(
n_splits=num_folds, shuffle=True, random_state=42)
test_preds_start = np.zeros((len(test_df), 128), dtype=np.float32)
test_preds_end = np.zeros((len(test_df), 128), dtype=np.float32)
for fold_num, (train_idx, valid_idx) in enumerate(kfold.split(train_df.text)):
print("\nfold %02d" % (fold_num+1))
# 創建train, valid, test數據集
train_dataset = TweetDataset.create(
train_df.iloc[train_idx], batch_size, shuffle_buffer_size=2048)
valid_dataset = TweetDataset.create(
train_df.iloc[valid_idx], batch_size, shuffle_buffer_size=-1)
test_dataset = TweetDataset.create(
test_df, batch_size, shuffle_buffer_size=-1)
best_score = float('-inf')
for epoch_num in range(num_epochs):
print("\nepoch %03d" % (epoch_num+1))
train(model, train_dataset, loss_fn, optimizer)
pred_start, pred_end, text, selected_text, sentiment, offset = \
predict(model, valid_dataset, loss_fn, optimizer)
selected_text_pred = decode_prediction(
pred_start, pred_end, text, offset, sentiment)
jaccards = []
for i in range(len(selected_text)):
jaccards.append(
jaccard(selected_text[i], selected_text_pred[i]))
score = np.mean(jaccards)
print(f"valid jaccard epoch {epoch_num+1:03d}: {score}"+" "*15)
if score > best_score:
best_score = score
# predict test set
test_pred_start, test_pred_end, test_text, _, test_sentiment, test_offset = \
predict(model, test_dataset, loss_fn, optimizer)
test_preds_start += test_pred_start * 0.2
test_preds_end += test_pred_end * 0.2
# 重置模型,避免OOM
session = tf.compat.v1.get_default_session()
graph = tf.compat.v1.get_default_graph()
del session, graph, model
model = BertModel.from_pretrained(PATH, config=config)
預測測試數據,並生成提交文件
selected_text_pred = decode_prediction(
test_preds_start, test_preds_end, test_text, test_offset, test_sentiment)
def f(selected):
return " ".join(set(selected.lower().split()))
submission_df.loc[:, 'selected_text'] = selected_text_pred
submission_df['selected_text'] = submission_df['selected_text'].map(f)
submission_df.to_csv("submission.csv", index=False)
這個方案在提交的時候在553個隊伍中排名153位, 分數為0.68。
Twitter情感分析案例之后會在矩池雲Demo鏡像中上線,可以直接使用。另矩池雲還支持了Paddle、MindSpore、MegEngine、Jittor等國產深度學習框架,可免安裝直接運行。