基於電影知識圖譜的智能問答系統


 

Python 實現

import os
from py2neo import Graph
from pyhanlp import *
from sklearn.naive_bayes import MultinomialNB

abstractMap = {}
vocabulary = []
questions_pattern = None


def sentence_to_arrays(sentence):
    global vocabulary
    vector = [0] * len(vocabulary.keys())
    for term in HanLP.segment(sentence):
        if term.word in vocabulary:
            index = vocabulary.get(term.word)
            vector[index] = 1
    return vector


def query_abstract(query_sentence):
    """將HanLp分詞后的關鍵word,用抽象詞性xx替換

    :param query_sentence: 查詢句子
    :return:
    """
    # 句子抽象化
    terms = HanLP.segment(query_sentence)
    abstract_query = ""
    global abstractMap
    nr_count = 0
    for term in terms:
        print(term)
        if "nm" in str(term):  # nm 電影名
            abstract_query += "nm "
            abstractMap["nm"] = term.word
        elif "nr" in str(term) and nr_count == 0:  # nr 人名
            abstract_query += "nnt "
            abstractMap["nnt"] = term.word
            nr_count += 1
        elif "nr" in str(term) and nr_count == 1:  # nr 人名 再出現一次,改成nnr
            abstract_query += "nnr "
            abstractMap["nnr"] = term.word
            nr_count += 1
        elif "x" in str(term):  # x  評分
            abstract_query += "x "
            abstractMap["x"] = term.word
        elif "ng" in str(term):  # ng 類型
            abstract_query += "ng "
            abstractMap["ng"] = term.word
        else:
            abstract_query += term.word + " "
    print("========HanLP分詞結束========")
    return abstract_query


def query_classify(sentence):
    terms = sentence_to_arrays(sentence)
    global clf
    index = int(clf.predict([terms])[0])
    print("the model index is " + str(index))
    return questions_pattern.get(index), index


def sentenceReduction(query_pattern):
    """
    將句子模板還原成正常的語句(分詞關鍵word的抽象詞性替換成原有的word)
    :param str_pattern:
    :return:
    """
    global abstractMap
    for key in abstractMap.keys():
        if key in query_pattern:
            value = abstractMap.get(key)
            query_pattern = query_pattern.replace(key, value)

    abstractMap = {}
    return query_pattern


def analysis(query_sentence):
    # 原始問句
    print("原始句子:" + query_sentence)
    print("========HanLP開始分詞========")

    # 抽象句子,利用HanPL分詞,將關鍵字進行詞性抽象
    abstract_str = query_abstract(query_sentence)
    print("句子抽象化結果:" + abstract_str)

    # 將抽象的句子與Spark訓練集中的模板進行匹配,拿到句子對應的模板
    str_pattern, model_index = query_classify(abstract_str)
    print("句子套用模板結果:" + str_pattern)

    # 模板還原成句子,此時問題已轉換為我們熟悉的操作
    final_pattern = sentenceReduction(str_pattern)
    print("原始句子替換成系統可識別的結果:" + final_pattern)

    result = [model_index]
    result = result + final_pattern.split(" ")
    return result


def load_vocabulary():
    with open("question/vocabulary.txt", "r") as f:
        data = f.readlines()
    vocabulary = {}
    for line in data:
        tokens = line.split(":")
        index = int(tokens[0])
        word = tokens[1].strip()
        vocabulary[word] = index
    return vocabulary


def load_data(path="question"):
    X = []
    Y = []
    for root, dirs, files in os.walk(path):
        for f_name in files:
            if "" in f_name:  # 如:【3】劇情.txt
                value = f_name.split("")[0].replace("", "")
                with open(os.path.join(root, f_name)) as f:
                    data = f.readlines()
                for sentence in data:
                    x = sentence_to_arrays(sentence.strip())
                    X.append(x)
                    Y.append(int(value))
    return X, Y


def load_question_templates():
    path = "question/question_classification.txt"
    with open(path, "r") as f:
        lines = f.readlines()
    questions_pattern = {}
    for line in lines:
        terms = line.split(":")
        questions_pattern[int(terms[0])] = terms[1].strip()
    return questions_pattern


class QuestionService(object):

    def __init__(self):
        self.g = self.g = Graph(
            host="127.0.0.1",
            http_port=7474,
            user="neo4j",
            password="password"
        )

    def answer(self, question) -> str:
        result = analysis(question)
        answer = None
        model_index = result[0]

        if model_index == 0:
            answer = self.getMovieRating(result)
        elif model_index == 1:
            answer = self.getMovieReleaseDate(result)
        elif model_index == 2:
            answer = self.getMovieTypes(result)
        elif model_index == 3:
            # nm 簡介 == 電影簡介、詳情
            title = result[1]
            answer = self.getMovieInfo(title)
        elif model_index == 4:
            answer = self.getMovieActors(result)
        elif model_index == 5:
            #  nnt 介紹 == 演員簡介
            answer = self.getActorInfo(result)
        elif model_index == 6:
            answer = self.getMoviesByType(result)
        elif model_index == 7:
            #  nnt 電影作品 == 演員的電影作品有哪些
            actorMovies = self.getActorMovies(result)
            if len(actorMovies) == 0:
                answer = None
            else:
                answer = ",".join(actorMovies)
        elif model_index == 8:
            answer = self.getActorMoviesByHScore(result)
        elif model_index == 9:
            answer = self.getActorMoviesByLScore(result)
        elif model_index == 10:
            answer = self.getActorMoviesType(result)
        elif model_index == 11:
            answer = self.getActorMovies(result)
        elif model_index == 12:
            answer = self.getMoviesCount(result)
        elif model_index == 13:
            # nnt  出生日期 == 演員出生日期 * /
            answer = self.getActorBirth(result)
        if answer:
            return answer
        else:
            return "sorry,小主,我沒有找到你要的答案"

    def getMovieRating(self, result):
        """0 對應問題模板0 == nm(電影) 評分

        :param result: 電影標題
        :return: 返回電影的評分
        """
        # **nm 評分 == 電影評分*/
        title = result[1]
        query = f"match(n:Movie) where n.title='{title}' return n.rating"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("n.rating")
        return None

    def getMovieReleaseDate(self, result):
        """ nm 上映時間 == 電影上映時間"""
        title = result[1]
        query = f"match(n:Movie) where n.title='{title}' return n.releasedate"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("n.releasedate")
        return None

    def getMovieTypes(self, result):
        """ 2 對應問題模板2 == nm(電影) 類型
        @param title 電影標題
        @return 返回電影的類型、風格
        """
        title = result[1]
        query = f"match(n:Movie)-[r:is]->(b:Genre) where n.title='{title}' return b.name"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("b.name")
        return None

    def getMovieInfo(self, title):
        """3 對應問題模板3 == nm(電影) 簡介

        :param title: 電影標題
        :return: 返回電影的劇情、簡介
        """
        query = f"match(n:Movie) where n.title ='{title}' return n.introduction"
        ress = self.g.run(query).data()
        return ress[0].get("n.introduction")

    def getMovieActors(self, result):
        """4 對應問題模板4 == nm(電影) 簡介, nm 演員列表 == 電影演員列表

        :param result: 電影標題
        :return: 返回電影中出演的演員都有哪些
        """
        title = result[1]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where m.title ='{title}' return n.name"
        answer = self.g.run(query).data()
        if answer:
            return "".join([item.get("n.name") for item in answer])
        return None

    def getActorInfo(self, result):
        """5 對應問題模板5 == nnt(演員) 簡介

        :param name: 演員名
        :return: 返回演員的出生日期
        """
        name = result[1]
        query = f"match(n:Person) where n.name='{name}' return n.birthplace"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("n.birthplace")
        return None

    def getMoviesByType(self, result):
        """ 6 對應問題模板6 == nnt(演員) ng(電影類型) 電影作品

        :param name: 演員名
        :param gname: 電影類型名稱
        :return: 返回電影名稱列表
        """
        name = result[1]
        gname = result[2]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name='{name}' match(g:Genre)-[:is]-(m) where g.name=~'{gname}' return distinct  m.title"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("m.title")
        return None

    def getActorMovies(self, result):
        """7對應問題模板7 == nnt(演員) 電影作品

        :param name: 演員名
        :return:
        """
        name = result[1]
        query = f"match(n:Person)-[:actedin]->(m:Movie) where n.name={name} return m.title"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("m.title")
        return None

    def getActorMoviesByHScore(self, result):
        """8對應問題模板8 == nnt 參演評分 大於 x(電影評分), 1 2 3 4 nnt 參演評分 大於 x == 演員參演的電影評分大於x的有哪些

        @param name 演員姓名
        @param score 電影分數
        """
        name = result[1]
        score = result[2]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' and m.rating > '{score}' return m.title"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("m.title")
        return None

    def getActorMoviesByLScore(self, result):
        """9對應問題模板9 == nnt 參演評分 小於 x(電影評分), 1 2 3 4 nnt 參演評分 小於 x == 演員參演的電影評分小於x的有哪些
        @param name 演員姓名
        @param score 電影分數
        @return
        """
        name = result[1]
        score = result[1]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' and m.rating < '{score}' return m.title"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("b.name")
        return None

    def getActorMoviesType(self, result):
        """10 對應問題模板10 == nnt(演員) 電影類型, nnt 電影類型 == 演員參演的電影類型有哪些

        @param name  演員名
        @return 返回演員出演過的所有電影的類型集合【不重復的】
        """
        name = result[1]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' match(p:Genre)-[:is]-(m) return distinct  p.name"
        answer = self.g.run(query).data()
        if answer:
            return "".join([item.get("n.name") for item in answer])
        return None

    def getMoviesCount(self, result):
        """12 對應問題模板12 == nnt(演員) 電影數量,nm 演員列表 == 電影演員列表
        @param name 演員名
        @return 返回演員出演過的所有電影的類型集合【不重復的】
        """
        name = result[1]
        query = f"match(n)-[:actedin]-(m) where n.name ={name} return count(*)"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("b.name")
        return None

    def getActorBirth(self, result):
        """13 對應問題模板13 == nnt(演員) 出生日期
        @param name 演員名
        @return 返回演員的出生日期
        """
        name = result[1]
        query = f"match(n:Person) where n.name='{name}' return n.birth"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("b.name")
        return None


if __name__ == '__main__':
    questions_pattern = load_question_templates()
    vocabulary = load_vocabulary()
    X, Y = load_data()
    clf = MultinomialNB()
    clf.fit(X, Y)

    question_service = QuestionService()
    # print(question_service.answer("英雄這部電影主要講什么"))
    # print(question_service.answer("我的父親母親這部電影的分數是多少"))
    # print(question_service.answer("我的父親母親有哪些人出演"))
    # print(question_service.answer("周星馳演過那些類型的電影"))
    print(question_service.answer("周星馳演過喜劇片有那些"))

pyhanlp 自定義字典

英雄 nm 11
我的父親母親 nm 11
喜劇片 ng 11

參考地址

基於電影知識圖譜的智能問答系統(一) -- Mysql數據准備

基於電影知識圖譜的智能問答系統(二) -- Neo4j導入CSV文件

基於電影知識圖譜的智能問答系統(三) -- Spark環境搭建

基於電影知識圖譜的智能問答系統(四) --HanLP分詞器

基於電影知識圖譜的智能問答系統(五) --Spark朴素貝葉斯分類器

基於電影知識圖譜的智能問答系統(六) -- 問題訓練樣本集敲定

基於電影知識圖譜的智能問答系統(七) -- Neo4j語句那點事

基於電影知識圖譜的智能問答系統(八) -- 終極完結篇


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM