基于电影知识图谱的智能问答系统


 

Python 实现

import os
from py2neo import Graph
from pyhanlp import *
from sklearn.naive_bayes import MultinomialNB

abstractMap = {}
vocabulary = []
questions_pattern = None


def sentence_to_arrays(sentence):
    global vocabulary
    vector = [0] * len(vocabulary.keys())
    for term in HanLP.segment(sentence):
        if term.word in vocabulary:
            index = vocabulary.get(term.word)
            vector[index] = 1
    return vector


def query_abstract(query_sentence):
    """将HanLp分词后的关键word,用抽象词性xx替换

    :param query_sentence: 查询句子
    :return:
    """
    # 句子抽象化
    terms = HanLP.segment(query_sentence)
    abstract_query = ""
    global abstractMap
    nr_count = 0
    for term in terms:
        print(term)
        if "nm" in str(term):  # nm 电影名
            abstract_query += "nm "
            abstractMap["nm"] = term.word
        elif "nr" in str(term) and nr_count == 0:  # nr 人名
            abstract_query += "nnt "
            abstractMap["nnt"] = term.word
            nr_count += 1
        elif "nr" in str(term) and nr_count == 1:  # nr 人名 再出现一次,改成nnr
            abstract_query += "nnr "
            abstractMap["nnr"] = term.word
            nr_count += 1
        elif "x" in str(term):  # x  评分
            abstract_query += "x "
            abstractMap["x"] = term.word
        elif "ng" in str(term):  # ng 类型
            abstract_query += "ng "
            abstractMap["ng"] = term.word
        else:
            abstract_query += term.word + " "
    print("========HanLP分词结束========")
    return abstract_query


def query_classify(sentence):
    terms = sentence_to_arrays(sentence)
    global clf
    index = int(clf.predict([terms])[0])
    print("the model index is " + str(index))
    return questions_pattern.get(index), index


def sentenceReduction(query_pattern):
    """
    将句子模板还原成正常的语句(分词关键word的抽象词性替换成原有的word)
    :param str_pattern:
    :return:
    """
    global abstractMap
    for key in abstractMap.keys():
        if key in query_pattern:
            value = abstractMap.get(key)
            query_pattern = query_pattern.replace(key, value)

    abstractMap = {}
    return query_pattern


def analysis(query_sentence):
    # 原始问句
    print("原始句子:" + query_sentence)
    print("========HanLP开始分词========")

    # 抽象句子,利用HanPL分词,将关键字进行词性抽象
    abstract_str = query_abstract(query_sentence)
    print("句子抽象化结果:" + abstract_str)

    # 将抽象的句子与Spark训练集中的模板进行匹配,拿到句子对应的模板
    str_pattern, model_index = query_classify(abstract_str)
    print("句子套用模板结果:" + str_pattern)

    # 模板还原成句子,此时问题已转换为我们熟悉的操作
    final_pattern = sentenceReduction(str_pattern)
    print("原始句子替换成系统可识别的结果:" + final_pattern)

    result = [model_index]
    result = result + final_pattern.split(" ")
    return result


def load_vocabulary():
    with open("question/vocabulary.txt", "r") as f:
        data = f.readlines()
    vocabulary = {}
    for line in data:
        tokens = line.split(":")
        index = int(tokens[0])
        word = tokens[1].strip()
        vocabulary[word] = index
    return vocabulary


def load_data(path="question"):
    X = []
    Y = []
    for root, dirs, files in os.walk(path):
        for f_name in files:
            if "" in f_name:  # 如:【3】剧情.txt
                value = f_name.split("")[0].replace("", "")
                with open(os.path.join(root, f_name)) as f:
                    data = f.readlines()
                for sentence in data:
                    x = sentence_to_arrays(sentence.strip())
                    X.append(x)
                    Y.append(int(value))
    return X, Y


def load_question_templates():
    path = "question/question_classification.txt"
    with open(path, "r") as f:
        lines = f.readlines()
    questions_pattern = {}
    for line in lines:
        terms = line.split(":")
        questions_pattern[int(terms[0])] = terms[1].strip()
    return questions_pattern


class QuestionService(object):

    def __init__(self):
        self.g = self.g = Graph(
            host="127.0.0.1",
            http_port=7474,
            user="neo4j",
            password="password"
        )

    def answer(self, question) -> str:
        result = analysis(question)
        answer = None
        model_index = result[0]

        if model_index == 0:
            answer = self.getMovieRating(result)
        elif model_index == 1:
            answer = self.getMovieReleaseDate(result)
        elif model_index == 2:
            answer = self.getMovieTypes(result)
        elif model_index == 3:
            # nm 简介 == 电影简介、详情
            title = result[1]
            answer = self.getMovieInfo(title)
        elif model_index == 4:
            answer = self.getMovieActors(result)
        elif model_index == 5:
            #  nnt 介绍 == 演员简介
            answer = self.getActorInfo(result)
        elif model_index == 6:
            answer = self.getMoviesByType(result)
        elif model_index == 7:
            #  nnt 电影作品 == 演员的电影作品有哪些
            actorMovies = self.getActorMovies(result)
            if len(actorMovies) == 0:
                answer = None
            else:
                answer = ",".join(actorMovies)
        elif model_index == 8:
            answer = self.getActorMoviesByHScore(result)
        elif model_index == 9:
            answer = self.getActorMoviesByLScore(result)
        elif model_index == 10:
            answer = self.getActorMoviesType(result)
        elif model_index == 11:
            answer = self.getActorMovies(result)
        elif model_index == 12:
            answer = self.getMoviesCount(result)
        elif model_index == 13:
            # nnt  出生日期 == 演员出生日期 * /
            answer = self.getActorBirth(result)
        if answer:
            return answer
        else:
            return "sorry,小主,我没有找到你要的答案"

    def getMovieRating(self, result):
        """0 对应问题模板0 == nm(电影) 评分

        :param result: 电影标题
        :return: 返回电影的评分
        """
        # **nm 评分 == 电影评分*/
        title = result[1]
        query = f"match(n:Movie) where n.title='{title}' return n.rating"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("n.rating")
        return None

    def getMovieReleaseDate(self, result):
        """ nm 上映时间 == 电影上映时间"""
        title = result[1]
        query = f"match(n:Movie) where n.title='{title}' return n.releasedate"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("n.releasedate")
        return None

    def getMovieTypes(self, result):
        """ 2 对应问题模板2 == nm(电影) 类型
        @param title 电影标题
        @return 返回电影的类型、风格
        """
        title = result[1]
        query = f"match(n:Movie)-[r:is]->(b:Genre) where n.title='{title}' return b.name"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("b.name")
        return None

    def getMovieInfo(self, title):
        """3 对应问题模板3 == nm(电影) 简介

        :param title: 电影标题
        :return: 返回电影的剧情、简介
        """
        query = f"match(n:Movie) where n.title ='{title}' return n.introduction"
        ress = self.g.run(query).data()
        return ress[0].get("n.introduction")

    def getMovieActors(self, result):
        """4 对应问题模板4 == nm(电影) 简介, nm 演员列表 == 电影演员列表

        :param result: 电影标题
        :return: 返回电影中出演的演员都有哪些
        """
        title = result[1]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where m.title ='{title}' return n.name"
        answer = self.g.run(query).data()
        if answer:
            return "".join([item.get("n.name") for item in answer])
        return None

    def getActorInfo(self, result):
        """5 对应问题模板5 == nnt(演员) 简介

        :param name: 演员名
        :return: 返回演员的出生日期
        """
        name = result[1]
        query = f"match(n:Person) where n.name='{name}' return n.birthplace"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("n.birthplace")
        return None

    def getMoviesByType(self, result):
        """ 6 对应问题模板6 == nnt(演员) ng(电影类型) 电影作品

        :param name: 演员名
        :param gname: 电影类型名称
        :return: 返回电影名称列表
        """
        name = result[1]
        gname = result[2]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name='{name}' match(g:Genre)-[:is]-(m) where g.name=~'{gname}' return distinct  m.title"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("m.title")
        return None

    def getActorMovies(self, result):
        """7对应问题模板7 == nnt(演员) 电影作品

        :param name: 演员名
        :return:
        """
        name = result[1]
        query = f"match(n:Person)-[:actedin]->(m:Movie) where n.name={name} return m.title"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("m.title")
        return None

    def getActorMoviesByHScore(self, result):
        """8对应问题模板8 == nnt 参演评分 大于 x(电影评分), 1 2 3 4 nnt 参演评分 大于 x == 演员参演的电影评分大于x的有哪些

        @param name 演员姓名
        @param score 电影分数
        """
        name = result[1]
        score = result[2]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' and m.rating > '{score}' return m.title"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("m.title")
        return None

    def getActorMoviesByLScore(self, result):
        """9对应问题模板9 == nnt 参演评分 小于 x(电影评分), 1 2 3 4 nnt 参演评分 小于 x == 演员参演的电影评分小于x的有哪些
        @param name 演员姓名
        @param score 电影分数
        @return
        """
        name = result[1]
        score = result[1]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' and m.rating < '{score}' return m.title"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("b.name")
        return None

    def getActorMoviesType(self, result):
        """10 对应问题模板10 == nnt(演员) 电影类型, nnt 电影类型 == 演员参演的电影类型有哪些

        @param name  演员名
        @return 返回演员出演过的所有电影的类型集合【不重复的】
        """
        name = result[1]
        query = f"match(n:Person)-[:actedin]-(m:Movie) where n.name ='{name}' match(p:Genre)-[:is]-(m) return distinct  p.name"
        answer = self.g.run(query).data()
        if answer:
            return "".join([item.get("n.name") for item in answer])
        return None

    def getMoviesCount(self, result):
        """12 对应问题模板12 == nnt(演员) 电影数量,nm 演员列表 == 电影演员列表
        @param name 演员名
        @return 返回演员出演过的所有电影的类型集合【不重复的】
        """
        name = result[1]
        query = f"match(n)-[:actedin]-(m) where n.name ={name} return count(*)"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("b.name")
        return None

    def getActorBirth(self, result):
        """13 对应问题模板13 == nnt(演员) 出生日期
        @param name 演员名
        @return 返回演员的出生日期
        """
        name = result[1]
        query = f"match(n:Person) where n.name='{name}' return n.birth"
        answer = self.g.run(query).data()
        if answer:
            return answer[0].get("b.name")
        return None


if __name__ == '__main__':
    questions_pattern = load_question_templates()
    vocabulary = load_vocabulary()
    X, Y = load_data()
    clf = MultinomialNB()
    clf.fit(X, Y)

    question_service = QuestionService()
    # print(question_service.answer("英雄这部电影主要讲什么"))
    # print(question_service.answer("我的父亲母亲这部电影的分数是多少"))
    # print(question_service.answer("我的父亲母亲有哪些人出演"))
    # print(question_service.answer("周星驰演过那些类型的电影"))
    print(question_service.answer("周星驰演过喜剧片有那些"))

pyhanlp 自定义字典

英雄 nm 11
我的父亲母亲 nm 11
喜剧片 ng 11

参考地址

基于电影知识图谱的智能问答系统(一) -- Mysql数据准备

基于电影知识图谱的智能问答系统(二) -- Neo4j导入CSV文件

基于电影知识图谱的智能问答系统(三) -- Spark环境搭建

基于电影知识图谱的智能问答系统(四) --HanLP分词器

基于电影知识图谱的智能问答系统(五) --Spark朴素贝叶斯分类器

基于电影知识图谱的智能问答系统(六) -- 问题训练样本集敲定

基于电影知识图谱的智能问答系统(七) -- Neo4j语句那点事

基于电影知识图谱的智能问答系统(八) -- 终极完结篇


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM