本文是用機器學習打造聊天機器人系列的第四篇,將先對主要模塊的代碼進行展示和解讀,末尾會給出完整代碼的地址。建議先看主要模塊的代碼解讀,有助於理解核心代碼的思路,然后瀏覽完整項目代碼的README文檔,將項目跑起來體驗以下,再針對性的根據接口去閱讀各模塊代碼的實現。
主要模塊代碼
構造特征向量
特征向量的構造有兩種思想,一種是one-hot,一種是Dristributed Representation(這里用word2vec實現),一般來說后者能夠更好的表示詞的含義,但是有時候我們使用的句子來自特殊的領域,word2vec模型的預訓練語料未必能夠表示的很好,所以這個時候用one-hot就可能會表現的更好。
- one-hot
def build_feature(self, sentence, w_i_dict):
"""
根據詞匯表構造句子向量,其中用到的'w_i_dict'參數會通過以下方法先構造好:
# 構建訓練語料庫
build_corpus_vocabulary()
# 訓練語料庫分詞
cut_corpus_vocabulary()
# 構建訓練語料庫詞匯反向索引
word_index_dict_ = load_vocabulary()
# 存儲訓練語料庫詞匯反向索引
dump_word_index(word_index_dict_)
:param sentence: 句子
:param w_i_dict: 詞匯-位置索引字典
:return: one-hot 向量
"""
# 分詞
sentence_seg = jieba.cut(sentence)
# 用0初始化one-hot向量,維數為詞匯表的詞的個數
sen_vec = np.zeros(len(w_i_dict))
# 詞匯表的詞的列表
w_i_dict_keys = w_i_dict.keys()
# one-hot向量對應詞在詞典中的位置至1
for word in sentence_seg:
if w_i_dict_keys.__contains__(word):
sen_vec[w_i_dict[word]] = 1
return sen_vec
- Dristributed Representation
def sum_vecs_avg(self, text):
"""
根據詞向量模型構建句子向量
:param text: 句子
:return:
"""
# 加載詞向量模型
word_vec_model = ModelsLoader().sf_words_vec_model
# 用0值初始化一個同維數的向量,如果你知道你的詞向量模型是多少維的,可以直接指定,不用采用下面的野路子
vec = np.zeros(word_vec_model['是'].shape[0])
# 分詞
words_list = list(jieba.cut(text))
for w in words_list:
try:
# 將所有詞的向量累加
vec = vec + word_vec_model[w]
except KeyError as e:
logging.warning('詞‘%s’,不在詞向量模型詞匯表中', w)
continue
except ValueError as e:
logging.error('Error:', e)
break
# 計算平均向量
vec = vec / len(words_list)
return vec
意圖分類
和特征向量的構建一樣,分兩種方式,一種是基於貝葉斯算法(對應上面的one-hot特征),另一種是基於句子向量各分量的算數平均值構成的向量和輸入向量的夾角余弦相似度來分類(對應上面的詞向量特征)。前者的訓練是根據樣本計算概率模型,后者的訓練是提前計算好每個類別的中心向量。
def train_clf(self):
"""
基於貝葉斯算法訓練意圖分類器,並存儲為文件,以便下次使用
:return:
"""
dump_path = "%s/classifier_mnb.m" % get_resources_trained_models()
# 加載訓練樣本數據
features_np, labels_np = load_train_data()
features_np = np.array(features_np)
labels_np = np.array(labels_np)
# 開始訓練
starttime = datetime.datetime.now()
print("開始訓練分類器...")
# 使用多項式朴素貝葉斯算法訓練模型
clf = MultinomialNB(alpha=0.1, fit_prior=True, class_prior=None)
# 從第10個開始納入訓練,前10將做為驗證集評估模型的表現
clf.fit(features_np[10:], labels_np[10:])
endtime = datetime.datetime.now()
print("===========訓練耗時: %s" % (endtime - starttime).seconds)
# 評估分類器在驗證集上的表現
print("評估結果:%s" % clf.score(features_np[:10], labels_np[:10]))
self.clf_nb = clf
# 存儲分類器
dump_clf(self)
print("分類器存儲位置:%s" % dump_path)
return self
def predict(self, feature_vec, clf):
"""
預測(基於貝葉斯模型)
:param feature_vec: 輸入句子的特征向量
:param clf: 訓練好的貝葉斯模型
:return:
"""
proba_pred_np = clf.clf_nb.predict_proba(np.array([feature_vec]))[0]
logging.debug("預測結果的概率:%s", proba_pred_np)
# 加載類別集合
labels_set = load_labels_set()
label_score_list = []
for i, num in enumerate(proba_pred_np):
# if num != 0.00000000e+00:
if num >= current_app.config['THRESHOLD_INTENT_RECOGNITION']:
label_score_list.append((labels_set[i], num))
if len(label_score_list) == 0: # 正常閾值下沒有匹配項,就降級匹配
logging.debug("意圖識別在正常分數閾值下沒有匹配到任何項,進行降級匹配...")
for i, num in enumerate(proba_pred_np):
# if num != 0.00000000e+00:
if num >= current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']:
label_score_list.append((labels_set[i], num))
rs = sorted(label_score_list, key=lambda item: item[1], reverse=True)
return rs, [c for c, v in rs]
def train_clf(self):
"""
訓練分類器(基於中心向量的方式)
:return:
"""
data = DataLoader().load_train_data()
logging.info("開始訓練...")
_, labels_centroids_dict = self.cal_centroid_vec(data)
self.labels_centroids_dict = labels_centroids_dict
self.labels = list(labels_centroids_dict.keys())
logging.info("訓練完成!")
# 存儲分類器模型
self.dump(self)
return self
def cal_centroid_vec(self, data):
"""
構建“類別-中心向量”字典
:param data: {'類別':{examples:'句子樣本',centroid:'中心向量'}}
:return:
"""
labels_centroids_dict = {}
for the_label in data.keys():
centroid = self.get_centroid(data[the_label]["examples"])
data[the_label]["centroid"] = centroid
labels_centroids_dict[the_label] = centroid
return data, labels_centroids_dict
def get_centroid(self, examples):
"""
獲取當前意圖類別的中心向量。中心向量由examples中所有句子向量各分量上的算數平均數表示
:param examples: 當前類別下的所有樣本句子
:return:
"""
word_vec_model = ModelsLoader().sf_words_vec_model
word_dim = word_vec_model['是'].shape[0]
C = np.zeros((len(examples), word_dim))
for idx, text in enumerate(examples):
C[idx, :] = self.sum_vecs_avg(text)
centroid = np.mean(C, axis=0)
assert centroid.shape[0] == word_dim
return centroid
def predict(self, feature_vec, clf):
"""
預測意圖類別(基於向量夾角余弦值)
:param feature_vec: 輸入句子的特征向量
:param clf: 從接口繼承下來的參數,這里用不到
:return:
"""
intents = self.labels
# 分數計算規則:計算新句子的向量和當前意圖類別的中心向量的夾角余弦值,下面其實可以改進以下,用矩陣並行計算代替for循環,但是因為類別目前不多,影響暫時不大。
scores = [(label_, np.dot(feature_vec, self.labels_centroids_dict[label_]) / (
np.linalg.norm(feature_vec) * np.linalg.norm(self.labels_centroids_dict[label_]))) for label_ in
intents]
rs = sorted(scores, key=lambda item: item[1], reverse=True)
top1scores = rs[0][1]
top1label = rs[0][0]
logging.debug("top1的分數:%s,label:%s", top1scores, top1label)
if top1scores >= current_app.config['THRESHOLD_INTENT_RECOGNITION']:
rs = rs[:1]
elif top1scores >= current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']:
logging.debug("意圖識別在正常分數閾值下沒有匹配到任何項,進行降級匹配...")
elif top1scores < current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']:
logging.debug("意圖識別在最小分數閾值下沒有匹配到任何項...")
rs = []
return rs, [c for c, v in rs]
語義匹配
def compare(self, statement, statement_vec):
"""
比較夾角余弦值
:param statement: 輸入句子對象
:param statement_vec: 句子樣本特征向量,是一個二維list
:return: 輸入句子和各句子樣本的相似度構成的二維數組
"""
statement_text_vec = statement.text_vector
statement_vec = np.array(statement_vec)
# 向量化並行計算余弦值
similarity = np.dot(statement_text_vec, statement_vec.T) / (
np.linalg.norm(statement_text_vec) * np.linalg.norm(statement_vec, axis=1)).T
print("similarity.shape %s" % similarity.shape)
return similarity
chatterbot訓練
本項目里,作者把訓練語料的類型分成了閑聊和業務兩大類,下面你會看到很多SF關鍵字,就是指業務,至於為什么叫SF,是歷史遺留(lan)的問題,不必過於糾結。閑聊類目前我們不拆分,所以代碼和上面介紹chatterbot的時候的代碼類似,但是對於業務類的樣本,由於我們需要分成多個類型,所以這里要創建多個chatterbot實例,下面展示的是業務類的chatbot的實例化過程:
def train_sf_chatbot():
data_root_dir = path_configer.get_classifier_train_samples()
for file_name in os.listdir(data_root_dir):
if file_name.startswith("QA_sf_"):
__train(('%s/%s' % (get_chatter_corpus(), file_name)), file_name[:file_name.find('-')])
def __train(corpus_path, collection_name):
print("開始訓練SF...")
starttime = datetime.now()
chatbot = SF().chatters[collection_name]
chatbot.set_trainer(ListTrainer)
chatbot.train(read_custom(corpus_path))
print("SF訓練完成!")
endtime = datetime.now()
print("===========訓練耗時: %s秒" % (endtime - starttime).seconds)
@singleton
class SF(object):
def __init__(self):
logging.info('預加載sf詞向量模型...')
logging.info('預加載SF所有實例...')
labels = [file_name[:file_name.find("-")] for file_name in os.listdir(path_configer.get_chatter_corpus()) if
file_name.startswith("QA_sf_")]
chatters = {}
bot_name = current_app.config['DATABASE']
# 根據不同的類型,創建不同的ChatBot實例
for label in labels:
chatters[label] = (
ChatBot(
bot_name,
database=bot_name,
database_uri=current_app.config['DATABASE_URI'],
# 使用合適的詞向量模型時開啟
preprocessors=[
'kbqa_sf.train.chatter.sf.sf_preprocessors.sum_vecs_avg'
],
statement_comparison_function=WordVecComparator(),
# statement_comparison_function=levenshtein_distance,
logic_adapters=[{'import_path': 'kbqa_sf.train.chatter.sf.sf_adapter.BestMatchExtLogicAdapter'}],
storage_adapter="kbqa_sf.train.chatter.sf.sf_mongo_storage.MongoDatabaseExtAdapter",
ext_collection_name=label,
read_only=True)
)
self.chatters = chatters
logging.info('SF所有實例預加載完成!')
在線學習
chatterbot提供了學習接口,就是方便以后再追加新的問答對,代碼如下:
# a:問題對象Statement,q:回答對象Statement
chatbot_.learn_response(a, q)
但是光是執行上面的代碼,在我們的項目中是不夠的,因為當樣本庫變動了,我們的意圖分類器,詞匯-索引字典,句子-句向量字典都要重新生成。如果你的樣本庫數量不大,那么這個過程還是很快的,但是如果數據量比較大的話,比如上萬條,那么這個過程需要幾十秒到幾分鍾。所以不建議讓用戶能夠直接通過web頁面就使用這個學習的接口,而是采用異步的方式,先記錄下用戶提交的反饋,然后定時由程序在后台執行比較合適。當然,如果你是自己隨便玩玩,數據量不大的話,直接通過web頁面使用這個接口是最方便的了。在線學習的代碼如下,分為記錄和學習2個接口:
@qac.route('/record', methods=['POST'])
def record():
"""
將要學習的問題、答案、類別,寫入文件learn目錄下的wait-learn.txt、history-learn.txt
:return:
"""
qac_list = request.get_json()
learn_path = path_configer.get_learn()
wait_learn_path = "%s/%s" % (learn_path, "wait-learn.txt")
history_learn_path = "%s/%s" % (learn_path, "history-learn.txt")
with __record_lock:
fa_wait = codecs.open(wait_learn_path, "a", encoding="utf-8")
fa_history = codecs.open(history_learn_path, "a", encoding="utf-8")
for qac_item in qac_list:
q = qac_item["q"]
a = qac_item["a"]
c = qac_item["c"]
if 0 < len(a) <= 300 and len(q) > 0 and len(c) > 0:
content = 'Q %s\nA %s\nC %s\n' % (q, a, c)
fa_wait.write(content)
fa_history.write(
'%sT %s\n' % (content, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
else:
return make_response(jsonify({'error': '參數不符合要求,請檢查!'}), 400)
fa_wait.close()
fa_history.close()
logging.debug("=========待學習問題記錄完成!")
return "success"
@qac.route('/learn/batch', methods=['GET'])
def learn_batch():
"""
批量學習給定的問題和答案:
重命名wait-learn.txt為learning.txt,讀取learning.txt的內容進行學習
:return:
"""
_learn_new_batch_lock = threading.Lock()
logging.debug("開始學習...")
starttime = datetime.datetime.now()
learn_path = path_configer.get_learn()
wait_learn_path = "%s/%s" % (learn_path, "wait-learn.txt")
learning_path = "%s/%s" % (learn_path, "learning.txt")
with __record_lock:
if os.path.exists(learning_path):
# 若上一次的臨時文件未能刪除,就在這里刪除。
os.remove(learning_path)
logging.info("=========發現上一次的臨時文件未能刪除,已刪除!")
if not os.path.exists(wait_learn_path):
msg = "nothing"
logging.info(msg)
return msg
os.rename(wait_learn_path, learning_path)
logging.debug("重命名wait-learn.txt為learning.txt ...")
with _learn_new_batch_lock:
logging.debug("讀取learning.txt的內容進行學習 ...")
with codecs.open(learning_path, "r", encoding="utf-8") as fr:
q = fr.readline().strip("\n\r")
while q != "":
a = fr.readline().strip("\n\r")
assert a.strip("\n\r") != "", 'q,a,c格式無法匹配!缺少a!'
c = fr.readline().strip("\n\r")
assert c.strip("\n\r") != "", 'q,a,c格式無法匹配!缺少a!'
# 添加q,a到指定的c類別文件;訓練c對應的chatterbot
logging.debug("添加%s,%s到指定的%s類別文件;訓練對應的chatterbot ...", q, a, c)
# 開始學習
learn_(q, a, c[c.find(" ") + 1:])
q = fr.readline().strip("\n\r")
logging.debug("learning.txt學習全部完成...")
logging.debug("完整的重新訓練分類器模型 ...")
IntentClassifier().full_retrain_clf()
logging.debug("構建文本-向量索引文件,並存儲 ...")
IntentClassifier().build_text_vec_indx()
logging.debug("加載文本向量索引文件 ...")
IntentClassifier().load_text_vec_indx()
# 刪除臨時的學習文件
os.remove(learning_path)
endtime = datetime.datetime.now()
print("===========本次學習耗時: %s秒" % (endtime - starttime).seconds)
logging.info("=========本次學習已全部完成!")
return "success"
def learn_(q, a, c):
"""
添加q,a到指定的c類別文件;訓練c對應的chatterbot
:param q: 問題
:param a: 答案
:param c: 分類
:return:
"""
file_names = [file_name for file_name in os.listdir(path_configer.get_chatter_corpus()) if
file_name.startswith(c)]
if not file_names:
logging.warning("未知的類別:%s,已忽略", c)
return
file_name = file_names[0]
file_path = "%s/%s" % (path_configer.get_chatter_corpus(), file_name)
# 追加到c對應的意圖分類文件中
with codecs.open(file_path, "a", encoding="utf-8") as fa:
if len(q) > 0 and len(a) > 0:
if os.path.getsize(file_path) == 0:
fa.write('%s' % q)
else:
fa.write('\n%s' % q)
fa.write('\n%s' % a)
# 學習問答
qa_learn(q, a, c)
return "success"
def qa_learn(q, a, c):
a_statement = Statement(a)
q_statement = Statement(q)
if c.startswith("QA_talk"):
chat_bot = Talk().chat
else:
chat_bot = SF().chatters[c]
chat_bot.learn_response(a_statement, q_statement)
以上是主要功能的代碼,若要獲取可運行的完整代碼,可以加作者微信(jiabao512859468)獲取,有任何相關技術問題,都歡迎和作者探討O(∩_∩)O~
ok,有了代碼,下一篇將介紹如何將聊天機器人項目應用到不同的業務領域,以及如何接入其他項目中。
本篇就這么多內容啦~,感謝閱讀O(∩_∩)O。