Python 實現
import os from py2neo import Graph from pyhanlp import * from sklearn.naive_bayes import MultinomialNB abstractMap = {} vocabulary = [] questions_pattern = None def sentence_to_arrays(sentence): global vocabulary vector = [0] * len(vocabulary.keys()) for term in HanLP.segment(sentence): if term.word in vocabulary: index = vocabulary.get(term.word) vector[index] = 1 return vector def query_abstract(query_sentence): """將HanLp分詞后的關鍵word,用抽象詞性xx替換 :param query_sentence: 查詢句子 :return: """ # 句子抽象化 terms = HanLP.segment(query_sentence) abstract_query = "" global abstractMap nr_count = 0 for term in terms: print(term) if "nm" in str(term): # nm 電影名 abstract_query += "nm " abstractMap["nm"] = term.word elif "nr" in str(term) and nr_count == 0: # nr 人名 abstract_query += "nnt " abstractMap["nnt"] = term.word nr_count += 1 elif "nr" in str(term) and nr_count == 1: # nr 人名 再出現一次,改成nnr abstract_query += "nnr " abstractMap["nnr"] = term.word nr_count += 1 elif "x" in str(term): # x 評分 abstract_query += "x " abstractMap["x"] = term.word elif "ng" in str(term): # ng 類型 abstract_query += "ng " abstractMap["ng"] = term.word else: abstract_query += term.word + " " print("========HanLP分詞結束========") return abstract_query def query_classify(sentence): terms = sentence_to_arrays(sentence) global clf index = int(clf.predict([terms])[0]) print("the model index is " + str(index)) return questions_pattern.get(index), index def sentenceReduction(query_pattern): """ 將句子模板還原成正常的語句(分詞關鍵word的抽象詞性替換成原有的word) :param str_pattern: :return: """ global abstractMap for key in abstractMap.keys(): if key in query_pattern: value = abstractMap.get(key) query_pattern = query_pattern.replace(key, value) abstractMap = {} return query_pattern def analysis(query_sentence): # 原始問句 print("原始句子:" + query_sentence) print("========HanLP開始分詞========") # 抽象句子,利用HanPL分詞,將關鍵字進行詞性抽象 abstract_str = query_abstract(query_sentence) print("句子抽象化結果:" + abstract_str) # 將抽象的句子與Spark訓練集中的模板進行匹配,拿到句子對應的模板 str_pattern, model_index = query_classify(abstract_str) print("句子套用模板結果:" + str_pattern) # 模板還原成句子,此時問題已轉換為我們熟悉的操作 final_pattern = sentenceReduction(str_pattern) print("原始句子替換成系統可識別的結果:" + final_pattern) result = [model_index] result = result + final_pattern.split(" ") return result def load_vocabulary(): with open("question/vocabulary.txt", "r") as f: data = f.readlines() vocabulary = {} for line in data: tokens = line.split(":") index = int(tokens[0]) word = tokens[1].strip() vocabulary[word] = index return vocabulary def load_data(path="question"): X = [] Y = [] for root, dirs, files in os.walk(path): for f_name in files: if "【" in f_name: # 如:【3】劇情.txt value = f_name.split("】")[0].replace("【", "") with open(os.path.join(root, f_name)) as f: data = f.readlines() for sentence in data: x = sentence_to_arrays(sentence.strip()) X.append(x) Y.append(int(value)) return X, Y def load_question_templates(): path = "question/question_classification.txt" with open(path, "r") as f: lines = f.readlines() questions_pattern = {} for line in lines: terms = line.split(":") questions_pattern[int(terms[0])] = terms[1].strip() return questions_pattern class QuestionService(object): def __init__(self): self.g = self.g = Graph( host="127.0.0.1", http_port=7474, user="neo4j", password="password" ) def answer(self, question) -> str: result = analysis(question) answer = None model_index = result[0] if model_index == 0: answer = self.getMovieRating(result) elif model_index == 1: answer = self.getMovieReleaseDate(result) elif model_index == 2: answer = self.getMovieTypes(result) elif model_index == 3: # nm 簡介 == 電影簡介、詳情 title = result[1] answer = self.getMovieInfo(title) elif model_index == 4: answer = self.getMovieActors(result) elif model_index == 5: # nnt 介紹 == 演員簡介 answer = self.getActorInfo(result) elif model_index == 6: answer = self.getMoviesByType(result) elif model_index == 7: # nnt 電影作品 == 演員的電影作品有哪些 actorMovies = self.getActorMovies(result) if len(actorMovies) == 0: answer = None else: answer = ",".join(actorMovies) elif model_index == 8: answer = self.getActorMoviesByHScore(result) elif model_index == 9: answer = self.getActorMoviesByLScore(result) elif model_index == 10: answer = self.getActorMoviesType(result) elif model_index == 11: answer = self.getActorMovies(result) elif model_index == 12: answer = self.getMoviesCount(result) elif model_index == 13: # nnt 出生日期 == 演員出生日期 * / answer = self.getActorBirth(result) if answer: return answer else: return "sorry,小主,我沒有找到你要的答案" def getMovieRating(self, result): """0 對應問題模板0 == nm(電影) 評分 :param result: 電影標題 :return: 返回電影的評分 """ # **nm 評分 == 電影評分*/ title = result[1] query = f"match(n:Movie) where n.title='{title}' return n.rating" answer = self.g.run(query).data() if answer: return answer[0].get("n.rating") return None def getMovieReleaseDate(self, result): """ nm 上映時間 == 電影上映時間""" title = result[1] query = f"match(n:Movie) where n.title='{title}' return n.releasedate" answer = self.g.run(query).data() if answer: return answer[0].get("n.releasedate") return None def getMovieTypes(self, result): """ 2 對應問題模板2 == nm(電影) 類型 @param title 電影標題 @return 返回電影的類型、風格 """ title = result[1] query = f"match(n:Movie)-[r:is]->(b:Genre) where n.title='{title}' return b.name" answer = self.g.run(query).data() if answer: return answer[0].get("b.name") return None def getMovieInfo(self, title): """3 對應問題模板3 == nm(電影) 簡介 :param title: 電影標題 :return: 返回電影的劇情、簡介 """ query = f"match(n:Movie) where n.title ='{title}' return n.introduction" ress = self.g.run(query).data() return ress[0].get("n.introduction") def getMovieActors(self, result): """4 對應問題模板4 == nm(電影) 簡介, nm 演員列表 == 電影演員列表 :param result: 電影標題 :return: 返回電影中出演的演員都有哪些 """ title = result[1] query = f"match(n:Person)-[:actedin]-(m:Movie) where m.title ='{title}' return n.name" answer = self.g.run(query).data() if answer: return ",".join([item.get("n.name") for item in answer]) return None def getActorInfo(self, result): """5 對應問題模板5 == nnt(演員) 簡介 :param name: 演員名 :return: 返回演員的出生日期 """ name = result[1] query = f"match(n:Person) where n.name='{name}' return n.birthplace" answer = self.g.run(query).data() if answer: return answer[0].get("n.birthplace") return None def getMoviesByType(self, result): """ 6 對應問題模板6 == nnt(演員) ng(電影類型) 電影作品 :param name: 演員名 :param gname: 電影類型名稱 :return: 返回電影名稱列表 """ name = result[1] gname = result[2] query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name='{name}' match(g:Genre)-[:is]-(m) where g.name=~'{gname}' return distinct m.title" answer = self.g.run(query).data() if answer: return answer[0].get("m.title") return None def getActorMovies(self, result): """7對應問題模板7 == nnt(演員) 電影作品 :param name: 演員名 :return: """ name = result[1] query = f"match(n:Person)-[:actedin]->(m:Movie) where n.name={name} return m.title" answer = self.g.run(query).data() if answer: return answer[0].get("m.title") return None def getActorMoviesByHScore(self, result): """8對應問題模板8 == nnt 參演評分 大於 x(電影評分), 1 2 3 4 nnt 參演評分 大於 x == 演員參演的電影評分大於x的有哪些 @param name 演員姓名 @param score 電影分數 """ name = result[1] score = result[2] query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' and m.rating > '{score}' return m.title" answer = self.g.run(query).data() if answer: return answer[0].get("m.title") return None def getActorMoviesByLScore(self, result): """9對應問題模板9 == nnt 參演評分 小於 x(電影評分), 1 2 3 4 nnt 參演評分 小於 x == 演員參演的電影評分小於x的有哪些 @param name 演員姓名 @param score 電影分數 @return """ name = result[1] score = result[1] query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' and m.rating < '{score}' return m.title" answer = self.g.run(query).data() if answer: return answer[0].get("b.name") return None def getActorMoviesType(self, result): """10 對應問題模板10 == nnt(演員) 電影類型, nnt 電影類型 == 演員參演的電影類型有哪些 @param name 演員名 @return 返回演員出演過的所有電影的類型集合【不重復的】 """ name = result[1] query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' match(p:Genre)-[:is]-(m) return distinct p.name" answer = self.g.run(query).data() if answer: return ",".join([item.get("n.name") for item in answer]) return None def getMoviesCount(self, result): """12 對應問題模板12 == nnt(演員) 電影數量,nm 演員列表 == 電影演員列表 @param name 演員名 @return 返回演員出演過的所有電影的類型集合【不重復的】 """ name = result[1] query = f"match(n)-[:actedin]-(m) where n.name ={name} return count(*)" answer = self.g.run(query).data() if answer: return answer[0].get("b.name") return None def getActorBirth(self, result): """13 對應問題模板13 == nnt(演員) 出生日期 @param name 演員名 @return 返回演員的出生日期 """ name = result[1] query = f"match(n:Person) where n.name='{name}' return n.birth" answer = self.g.run(query).data() if answer: return answer[0].get("b.name") return None if __name__ == '__main__': questions_pattern = load_question_templates() vocabulary = load_vocabulary() X, Y = load_data() clf = MultinomialNB() clf.fit(X, Y) question_service = QuestionService() # print(question_service.answer("英雄這部電影主要講什么")) # print(question_service.answer("我的父親母親這部電影的分數是多少")) # print(question_service.answer("我的父親母親有哪些人出演")) # print(question_service.answer("周星馳演過那些類型的電影")) print(question_service.answer("周星馳演過喜劇片有那些"))
英雄 nm 11 我的父親母親 nm 11 喜劇片 ng 11
參考地址
基於電影知識圖譜的智能問答系統(一) -- Mysql數據准備
基於電影知識圖譜的智能問答系統(二) -- Neo4j導入CSV文件
基於電影知識圖譜的智能問答系統(三) -- Spark環境搭建
基於電影知識圖譜的智能問答系統(五) --Spark朴素貝葉斯分類器
基於電影知識圖譜的智能問答系統(六) -- 問題訓練樣本集敲定