本節目錄
-
常用函數一:詞頻統計
-
常用函數二:word2vec
-
常用函數三:doc2vec
-
常用函數四:LDA主題分析
常用函數一:詞頻統計
# -*- coding: utf-8 -*- """ Datetime: 2020/06/25 Author: Zhang Yafei Description: 統計詞頻 輸入 文件名 列名 分割符 輸出 詞頻統計結果-文件 """ from collections import Counter import pandas as pd def count_word_freq(file_path, col_name, to_file, sep='; ', multi_table=False): """ 統計詞頻 :param file_path: 讀取文件路徑 :param col_name: 統計詞頻所在列名 :param to_file: 保存文件路徑 :param sep: 詞語分割符 :param multi_table: 是否讀取多張表 :return: """ if multi_table: datas = pd.read_excel(file_path, header=None, sheet_name=None) with pd.ExcelWriter(path=to_file) as writer: for sheet_name in datas: df = datas[sheet_name] keywords = (word for word_list in df.loc[df[col_name].notna(), col_name].str.split(sep) for word in word_list if word) words_freq = Counter(keywords) words = [word for word in words_freq] freqs = [words_freq[word] for word in words] words_df = pd.DataFrame(data={'word': words, 'freq': freqs}) words_df.sort_values('freq', ascending=False, inplace=True) words_df.to_excel(excel_writer=writer, sheet_name=sheet_name, index=False) writer.save() else: df = pd.read_excel(file_path) keywords = (word for word_list in df.loc[df[col_name].notna(), col_name].str.split(sep) for word in word_list if word) words_freq = Counter(keywords) words = [word for word in words_freq] freqs = [words_freq[word] for word in words] words_df = pd.DataFrame(data={'word': words, 'freq': freqs}) words_df.sort_values('freq', ascending=False, inplace=True) words_df.to_excel(to_file, index=False) if __name__ == '__main__': # 對data.xlsx所有表中的keyword列統計詞頻,以默認'; '為分割符切割詞語,統計該列分詞后的詞頻,結果保存至res.xlsx中 count_word_freq(file_path='data.xlsx', col_name='keyword', to_file='res.xlsx', multi_table=True)
經驗分享:注意輸入格式為excel文件,這也是我學習生活中常用的處理方式,直接拿去用,非常方便
另外,在我之前的一篇博客中,我介紹了Python統計詞頻常用的幾種方式,不同的場景可以滿足你各自的需求。博客傳送門:
https://www.cnblogs.com/zhangyafei/p/10653977.html
常用函數二:word2vec
word2vec是一種詞向量技術,核心思想是把單詞轉換成向量,意思相近的單詞向量間的距離越近,反之越遠。實際使用的體驗也是非常好。
# -*- coding: utf-8 -*- """ Datetime: 2019/7/25 Author: Zhang Yafei Description: word2vec data.txt word1 word2 word3 ... word1 word2 word3 ... word1 word2 word3 ... ... ... ... ... """ import warnings warnings.filterwarnings(action='ignore', category=UserWarning, module='gensim') from gensim.models.word2vec import LineSentence from gensim.models import Word2Vec def word2vec_model_train(file, model_path, ): model = Word2Vec(LineSentence(file), size=100, window=5, iter=10, min_count=5) model.save(model_path) def word2vec_load(self, model_path): model = Word2Vec.load(model_path) print(model.similarity('生育意願', '主觀幸福感')) for key in model.wv.similar_by_word('新生代農民工', topn=50): print(key) if __name__ == "__main__": word2vec_model_train(file='data.txt', model_path='word2vec_keywords.model') # word2vec_load(model_path='word2vec_keywords.model')
常用函數三:doc2vec
doc2vec和word2vec類似, word2vec是詞向量技術,那么doc2vec見名知意就是文檔向量技術,可以將一篇文檔轉換成一個向量。理論上講,意思相近的句子向量間的距離越近。
# -*- coding: utf-8 -*- """ Datetime: 2019/7/14 Author: Zhang Yafei Description: doc2vec docs format TaggedDocument([word1, word2, ...], [doc tag]) TaggedDocument([word1, word2, ...], [doc tag]) TaggedDocument([word1, word2, ...], [doc tag]) ... """ import os import warnings warnings.filterwarnings(action='ignore', category=UserWarning, module='gensim') from matplotlib import pyplot as plt from sklearn import metrics from sklearn.cluster import KMeans import pandas as pd import numpy as np from gensim.models.doc2vec import Doc2Vec, TaggedDocument output_dir = 'res' model_dir = 'model' if not os.path.exists(model_dir): os.mkdir(model_dir) if not os.path.exists(output_dir): os.mkdir(output_dir) def data_preparetion(): """ 數據預處理 准備文檔詞矩陣 :return [TaggedDocument(words=['contribut', 'antarctica', 'past', 'futur', 'sea-level', 'rise'], tags=[0]), TaggedDocument(words=['evid', 'limit', 'human', 'lifespan'], tags=[1]), ...] """ print('開始准備文檔語料') df = pd.read_excel('data/data.xlsx') documents = iter(df.text) for index, doc in enumerate(documents): doc_word_list = doc.split() yield TaggedDocument(doc_word_list, [index]) def get_datasest(): df = pd.read_excel('data/data.xlsx') documents = iter(df.text) datasets = [] for index, doc in enumerate(documents): doc_word_list = doc.split() datasets.append(TaggedDocument(doc_word_list, [index])) return datasets class Doc2VecModel(object): """ Doc2Vec模型 """ def __init__(self, vector_size=100, dm=0, window=10, epochs=30, iter_num=10): self.model = Doc2Vec(vector_size=vector_size, dm=dm, window=window, epochs=epochs, iter=iter_num, ) def run(self, documents, model_path, epochs=30): """ 訓練模型及結果的保存 :param documents: iterable [[doc1], [doc2], [doc3], ...] :param model_path: str :param max_epochs: int :param epochs: int :return: """ # 根據文檔詞矩陣構建詞匯表 print('開始構建詞匯表') self.model.build_vocab(documents) print('開始訓練') self.model.train(documents, total_examples=self.model.corpus_count, epochs=epochs) # 模型保存 self.model.save(f'{model_dir}/{model_path}') print(f'{model_path}\t保存成功') @staticmethod def simlarity_cal(vector1, vector2): vector1_mod = np.sqrt(vector1.dot(vector1)) vector2_mod = np.sqrt(vector2.dot(vector2)) if vector2_mod != 0 and vector1_mod != 0: simlarity = (vector1.dot(vector2)) / (vector1_mod * vector2_mod) else: simlarity = 0 return simlarity def model_test(self): doc2vec_model = Doc2Vec.load(f'{model_dir}/doc2vec.model') vectors_docs = doc2vec_model.docvecs.vectors_docs datasets = get_datasest() sentence1 = '老年人 生活滿意度 影響 全國 老年人口 健康狀況 調查數據 以往 社會經濟因素 健康 因素 人口因素 老年人 生活滿意度 影響 基礎 引入 變量 模型 分析 老年人 生活滿意度 自評 影響 統計 控制 影響因素 基礎 老年人 性格 情緒 孤獨感 焦慮 程度 生活滿意度 自評 影響 影響 原有 模型 變量 變化 生活滿意度 老年人' inferred_vector = doc2vec_model.infer_vector(sentence1) sims = doc2vec_model.docvecs.most_similar([inferred_vector], topn=10) for count, sim in sims: sentence = datasets[count] words = '' for word in sentence[0]: words = words + word + ' ' print(words, sim, len(sentence[0])) def get_topic_num(self, min_topic_num, max_topic_num): doc2vec_model = Doc2Vec.load(f'{model_dir}/doc2vec.model') vectors_docs = doc2vec_model.docvecs.vectors_docs silhouette_score_dict = {} ch_score_dict = {} inertia_score = {} for n in range(min_topic_num, max_topic_num + 1): km = KMeans(n_clusters=n) km.fit(X=vectors_docs) pre_labels = km.labels_ inertia = km.inertia_ sil_score = metrics.silhouette_score(X=vectors_docs, labels=pre_labels) ch_score = metrics.calinski_harabaz_score(X=vectors_docs, labels=pre_labels) print(f'{n} inertia score: {inertia} silhouette_score: {sil_score} ch score: {ch_score}') inertia_score[n] = inertia silhouette_score_dict[n] = sil_score ch_score_dict[n] = ch_score self.plot_image(data=silhouette_score_dict, xticks=range(min_topic_num, max_topic_num + 1), title='不同聚類個數下silhouette_score對比', xlabel='cluster_num', ylabel='silhouette_score') self.plot_image(data=ch_score_dict, xticks=range(min_topic_num, max_topic_num + 1), title='不同聚類個數下calinski_harabaz_score對比', xlabel='cluster_num', ylabel='calinski_harabaz_score') self.plot_image(data=inertia_score, xticks=range(min_topic_num, max_topic_num + 1), title='不同聚類個數下inertia score對比', xlabel='cluster_num', ylabel='inertia_score') @staticmethod def plot_image(data, title, xticks, xlabel, ylabel): """ 畫圖 """ plt.rcParams['font.sans-serif'] = ['SimHei'] plt.figure(figsize=(8, 4), dpi=500) plt.plot(data.keys(), data.values(), '#007A99') plt.xticks(xticks) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.title(title) plt.savefig(f'{output_dir}/{title}.png', bbox_inches='tight', pad_inches=0.1) plt.show() if __name__ == '__main__': docs = data_preparetion() model = Doc2VecModel(vector_size=100, epochs=30, window=10, dm=0, iter_num=20) model.run(documents=docs, model_path=f'doc2vec.model') # model.model_test() # model.get_topic_num(min_topic_num=5, max_topic_num=40)
常用函數四:LDA主題分析
LDA(Latent dirichlet allocation)是文檔主題生成模型中最有代表性的一種。LDA於2003年由David Blei等人提出,由於其應用簡單且有效,在學術界被廣泛應用在主題聚類、熱點識別、演化分析等領域。
# -*- coding: utf-8 -*- """ Datetime: 2019/7/14 Author: Zhang Yafei Description: LDA主題模型 安裝依賴環境 pip install pandas numpy matplotlib sklearn 使用說明: 1. 數據准備 index, docs = data_preparetion(path='data/數據.xlsx', doc_col='摘要') 數據格式為excel, path是數據文件路徑, doc_col是列名, 需修改數據文件路徑和文檔列名 2. LDA模型參數設定 LDA模型指定主題個數范圍 def main(index=index, docs=docs, test_topic_num=True, tfidf=False, max_iter=50, min_topic=5, max_topic=30, topic_word_num=20) :param index: 索引 :param docs: 文檔 :param n_topics: 指定主題個數 :param tfidf: 是否對文檔采用tfidf編碼 :param max_iter: 最大迭代次數 :param min_topic: 最小主題個數 前提為test_topic_num=True :param max_topic: 最大主題個數 前提為test_topic_num=True :param learning_offset: 學習率 :param random_state: 隨機狀態值 :param test_topic_num: 測試主題個數 :param topic_word_num: 主題詞矩陣詞的個數 """ import json import os import time from functools import wraps import numpy as np import pandas as pd import scipy from matplotlib import pyplot as plt from sklearn.decomposition import LatentDirichletAllocation from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity pd.set_option('display.max_columns', None) output_dir = 'res' if not os.path.exists(output_dir): os.mkdir(output_dir) def timeit(func): """ 時間裝飾器 """ @wraps(func) def inner(*args, **kwargs): start_time = time.time() ret = func(*args, **kwargs) end_time = time.time() - start_time if end_time < 60: print(f'共花費時間:', round(end_time, 2), '秒') else: minute, sec = divmod(end_time, 60) print(f'花費時間\t{round(minute)}分\t{round(sec, 2)}秒') return ret return inner class Articles(object): def __init__(self, data, stopwords=False): self.data = data if stopwords: self.stopwords = set([line.strip() for line in open('data/stopwords.txt')]) else: self.stopwords = None def __iter__(self): if self.stopwords: for word_list in self.data: yield ' '.join(self.pro_words_with_stopwords(word_list)) else: for word_list in self.data: yield ' '.join(self.pro_words(word_list)) @staticmethod def word_replace(word): return word.replace(' & ', '_____').replace('/', '___').replace(', ', '__'). \ replace(',', '__').replace(' ', '_').replace('-', '____'). \ replace('(', '______').replace(')', '______') def pro_words_with_stopwords(self, word_list): return (self.word_replace(word) for word in word_list if word.lower() not in self.stopwords) def pro_words(self, word_list): return (self.word_replace(word) for word in word_list) class SklearnLDA(object): def __init__(self, corpus, n_topics, tf_idf=True, max_iter=10, learning_method='online', learning_offset=50., random_state=0, res_dir='res', english_words_fixed=False): self.tfidf = tf_idf self.lda_model = LatentDirichletAllocation(n_components=n_topics, max_iter=max_iter, doc_topic_prior=0.001, topic_word_prior=0.02, learning_method=learning_method, learning_offset=learning_offset, random_state=random_state) # 定義lda模型 print('正在將語料轉化為向量------------') self.vectorizer = TfidfVectorizer() if tf_idf else CountVectorizer() self.bow_corpus = self.vectorizer.fit_transform(corpus) # 將語料生成詞袋向量 if english_words_fixed: self.vocab = self.fixed_vocab() else: self.vocab = self.vectorizer.get_feature_names() # 詞匯表 self.res_dir = res_dir def fixed_vocab(self): return [ vocab.replace('_____', ' & ').replace('____', '-').replace('___', '/').replace('__', ',').replace('_', ' ') for vocab in self.vectorizer.get_feature_names()] def get_topic_num(self, index, max_iter=10, min_topic=5, max_topic=30, learning_offset=50., random_state=0, topic_word_num=30): """ 確定LDA主題個數 """ print('開始訓練模型, 計算困惑度') perplexity_dict = {} kld_list = {} jsd_list = {} cos_sim_list = {} w_score_dict = {} x_ticks = list(range(min_topic, max_topic + 1)) for n_topics in x_ticks: result_dir = f'{self.res_dir}/{n_topics}' if not os.path.exists(result_dir): os.mkdir(result_dir) if os.path.exists(f'{result_dir}/topic-word-{topic_word_num}.csv'): doc_topic_matrix = np.loadtxt(f'{result_dir}/doc_topic_matrix.txt') topic_word_matrix = np.loadtxt(f'{result_dir}/topic_word_matrix.txt') else: lda = LatentDirichletAllocation(n_components=n_topics, max_iter=max_iter, learning_method='online', doc_topic_prior=0.001, topic_word_prior=0.02, learning_offset=learning_offset, random_state=random_state) # 定義lda模型 doc_topic_matrix = lda.fit_transform(self.bow_corpus) topic_word_matrix = lda.components_ # 計算困惑度 perplexity = lda.perplexity(self.bow_corpus) perplexity_dict[n_topics] = perplexity print(f'topic: {n_topics}\tsklearn preplexity: {perplexity:.3f}') # 保存數據 np.savetxt(f'{result_dir}/doc_topic_matrix.txt', doc_topic_matrix) np.savetxt(f'{result_dir}/topic_word_matrix.txt', topic_word_matrix) doc_topic_columns = [f'topic{num}' for num in range( 1, n_topics + 1)] topic_word_columns = [ f'word{num}' for num in range(1, topic_word_num + 1)] doc_topic_index = index topic_word_index = pd.Index(data=doc_topic_columns, name='topic') doc_topic_data = np.argsort(-doc_topic_matrix, axis=1) topic_word_data = np.array(self.vocab)[np.argsort(-topic_word_matrix, axis=1)[:, :topic_word_num]] self.save_data(file_path=f'{result_dir}/doc-topic.csv', data=doc_topic_data, columns=doc_topic_columns, index=doc_topic_index) self.save_data(file_path=f"{result_dir}/topic-word-{topic_word_num}.csv", data=topic_word_data, columns=topic_word_columns, index=topic_word_index) # 計算文本–主題最大平均分布概率和主題–詞語平均相似度概率的加權數值的方法 w_score = self.weight_score(doc_topic_matrix, topic_word_matrix) w_score_dict[n_topics] = w_score # 計算KL距離和JS距離 kld_sum = 0 jsd_sum = 0 for topic_vec1 in topic_word_matrix: for topic_vec2 in topic_word_matrix: kld_sum += self.kl_divergence(topic_vec1, topic_vec2) jsd_sum += self.js_divergence(topic_vec1, topic_vec2) avg_kld = kld_sum / (n_topics ** 2) kld_list[n_topics] = avg_kld avg_jsd = jsd_sum / (n_topics ** 2) jsd_list[n_topics] = avg_jsd # 計算余弦相似度 cos_sim_matrix = cosine_similarity(X=topic_word_matrix) cos_sim = cos_sim_matrix.sum() / (n_topics * (n_topics - 1)) cos_sim_list[n_topics] = cos_sim # 計算JS散度 for topic_vec1 in topic_word_matrix: for topic_vec2 in topic_word_matrix: jsd_sum += self.js_divergence(topic_vec1, topic_vec2) # 打印 print(f'topic: {n_topics}\tavg KLD: {avg_kld:.3f}') print(f'topic: {n_topics}\tavg JSD: {avg_jsd:.3f}') print(f'topic: {n_topics}\tcosine_similarity: {cos_sim:.3f}') print(f'topic: {n_topics}\tweight_score: {w_score:.3f}') # 畫圖 if perplexity_dict: self.plot_image(data=perplexity_dict, x_ticks=list(perplexity_dict.keys()), title='lda_topic_perplexity', xlabel='topic num', ylabel='perplexity') self.plot_image(data=kld_list, x_ticks=x_ticks, title='lda_topic_KLD', xlabel='topic num', ylabel='KLD') self.plot_image(data=jsd_list, title='lda_topic_JSD', x_ticks=x_ticks, xlabel='topic num', ylabel='JSD') self.plot_image(data=cos_sim_list, title='lda_topic_cosine_simlarity', x_ticks=x_ticks, xlabel='topic num', ylabel='cosine_simlarity') self.plot_image(data=w_score_dict, title='lda_topic_weight_score', x_ticks=x_ticks, xlabel='topic num', ylabel='weight_score') def train(self, index, topic_word_num=10, save_matrix=True, save_data=True, print_doc_topic=False, print_topic_word=True, save_vocab=True): """ 訓練LDA模型 """ print('正在訓練模型') doc_topic_matrix = self.lda_model.fit_transform(self.bow_corpus) topic_word_matrix = self.lda_model.components_ if save_vocab: with open('res/vocab.txt', 'w') as f: json.dump(self.vocab, f) if save_matrix: print('正在保存矩陣') if self.tfidf: np.savetxt(f'{output_dir}/doc_topic_tfidf_matrix.txt', doc_topic_matrix) np.savetxt(f'{output_dir}/topic_word_tfidf_matrix.txt', topic_word_matrix) else: np.savetxt(f'{output_dir}/doc_topic_matrix.txt', doc_topic_matrix) np.savetxt(f'{output_dir}/topic_word_matrix.txt', topic_word_matrix) if save_data: print('正在保存數據') doc_topic_columns = [f'topic{num}' for num in range( 1, self.lda_model.n_components + 1)] topic_word_columns = [ f'word{num}' for num in range(1, topic_word_num + 1)] doc_topic_index = index topic_word_index = pd.Index(data=doc_topic_columns, name='topic') doc_topic_data = np.argsort(-doc_topic_matrix, axis=1) topic_word_data = np.array( self.vocab)[np.argsort(-topic_word_matrix, axis=1)[:, :topic_word_num]] if self.tfidf: self.save_data(file_path=f'{output_dir}/doc-topic_tfidf.csv', data=doc_topic_data, columns=doc_topic_columns, index=doc_topic_index) self.save_data(file_path=f"{output_dir}/topic-word-tfidf_{topic_word_num}.csv", data=topic_word_data, columns=topic_word_columns, index=topic_word_index) else: self.save_data(file_path=f'{output_dir}/doc-topic.csv', data=doc_topic_data, columns=doc_topic_columns, index=doc_topic_index) self.save_data(file_path=f"{output_dir}/topic-word-{topic_word_num}.csv", data=topic_word_data, columns=topic_word_columns, index=topic_word_index) if print_doc_topic: print('正在輸出文檔-主題') for doc_num, doc_topic_index in zip(index, np.argsort(-doc_topic_matrix, axis=1)): print(f'{doc_num}:\t{doc_topic_index[:5]}') if print_topic_word: print('正在輸出主題-詞') for topic_num, topic_word_index in enumerate(np.argsort(-topic_word_matrix, axis=1)): words_list = np.array( self.vocab)[topic_word_index][: 10] print(f'主題{topic_num}:\t{words_list}') @staticmethod def save_data(file_path, data, columns, index): """ 保存數據 """ df = pd.DataFrame(data=data, columns=columns, index=index) df.to_csv(file_path, encoding='utf_8_sig') print(f'{file_path}\t保存成功') @staticmethod def kl_divergence(p, q): """ 有時也稱為相對熵,KL距離。對於兩個概率分布P、Q,二者越相似,KL散度越小。 KL散度滿足非負性 KL散度是不對稱的,交換P、Q的位置將得到不同結果。 :param p: :param q: :return: """ return scipy.stats.entropy(p, q) @staticmethod def js_divergence(p, q): """ JS散度基於KL散度,同樣是二者越相似,JS散度越小。 JS散度的取值范圍在0-1之間,完全相同時為0 JS散度是對稱的 :param p: :param q: :return: """ M = (p + q) / 2 return 0.5 * scipy.stats.entropy(p, M) + 0.5 * scipy.stats.entropy(q, M) @staticmethod def weight_score(doc_topic_matrix, topic_word_matrix): # doc_topic_matrix = np.loadtxt('res/doc_topic_matrix.txt') # topic_word_matrix = np.loadtxt('res/topic_word_matrix.txt') # 計算最大平均主題分布概率 max_mean_topic_prob = np.mean(np.max(doc_topic_matrix, axis=1)) # 計算平均主題相似度 topic_cos_sim_matrix = cosine_similarity(X=topic_word_matrix) topic_num = topic_cos_sim_matrix.shape[0] mean_topic_sim = np.sum(np.where(topic_cos_sim_matrix > 0.99, 0, topic_cos_sim_matrix)) / ( topic_num * (topic_num - 1)) # 加權得分 weight_score = max_mean_topic_prob / mean_topic_sim # print(f'加權得分:{weight_score}') return weight_score def plot_image(self, data, title, x_ticks, xlabel, ylabel): """ 畫圖 """ plt.figure(figsize=(12, 6), dpi=180) plt.plot(list(data.keys()), list(data.values()), '#007A99') plt.xticks(x_ticks) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.title(title) plt.savefig(f'{self.res_dir}/{title}.png', bbox_inches='tight', pad_inches=0.1) plt.show() def data_preparetion(path, doc_col, index_col=None, sep=None, english_words_fixed=False, stopwords=False): """ 數據准備 :param path: 數據路徑 :param doc_col: 文檔列 :param index_col: 索引列 :return: """ df = pd.read_excel(path) df.dropna(subset=[doc_col], inplace=True) if sep: docs = iter(df[doc_col].str.split(sep)) else: docs = iter(df[doc_col]) if english_words_fixed: documents = Articles(data=docs, stopwords=stopwords) else: documents = docs index_list = df[index_col] if index_col else df.index return index_list, documents @timeit def main(index, docs, n_topics=10, tfidf=False, max_iter=5, min_topic=5, max_topic=30, learning_offset=50., random_state=0, test_topic_num=False, topic_word_num=30, res_dir='res', english_words_fixed=False): """ 主函數 :param index: 索引 :param docs: 文檔 :param n_topics: 指定主題個數 :param tfidf: 是否對文檔采用tfidf編碼 :param max_iter: 最大迭代次數 :param min_topic: 最小主題個數 前提為test_topic_num=True :param max_topic: 最大主題個數 前提為test_topic_num=True :param learning_offset: 學習率 :param random_state: 隨機狀態值 :param test_topic_num: 測試主題個數 :param topic_word_num: 主題詞矩陣詞的個數 :param res_dir: 結果文件夾 :return: """ if not os.path.exists(res_dir): os.mkdir(res_dir) lda = SklearnLDA(corpus=docs, n_topics=n_topics, max_iter=max_iter, tf_idf=tfidf, learning_offset=learning_offset, random_state=random_state, res_dir=res_dir, english_words_fixed=english_words_fixed) if test_topic_num: lda.get_topic_num(index=index, max_iter=max_iter, min_topic=min_topic, max_topic=max_topic, learning_offset=learning_offset, random_state=random_state, topic_word_num=topic_word_num) else: lda.train(index=index, save_matrix=True, save_data=True, print_doc_topic=False, print_topic_word=True, topic_word_num=topic_word_num) if __name__ == '__main__': # 數據准備 # index, docs = data_preparetion(path='data/山西政策3.xlsx', doc_col='標題分詞') index, docs = data_preparetion(path='data/COVID-19-2020.xlsx', doc_col='keywords', index_col='PMID', sep='; ', english_words_fixed=True, stopwords=False) # LDA模型指定主題個數范圍 main(index=index, docs=docs, test_topic_num=True, tfidf=False, max_iter=50, min_topic=5, max_topic=10, topic_word_num=20, res_dir='res/聚類結果', english_words_fixed=True) # LDA模型指定主題個數 # main(index=index, docs=docs, n_topics=19, tfidf=False, max_iter=50)
topic_evolution.py
# -*- coding: utf-8 -*- ''' Datetime: 2019/08/16 author: Zhang Yafei description: colormap https://blog.csdn.net/Mr_Cat123/article/details/78638491 ''' import warnings warnings.filterwarnings(action='ignore', category=UserWarning, module='gensim') warnings.filterwarnings(action='ignore', category=UserWarning, module='gensim.matutils') import pandas as pd import numpy as np import os from gensim.models import Word2Vec import seaborn as sns import matplotlib.pyplot as plt plt.rcParams['font.sans-serif'] = ['SimHei'] # plt.figure(figsize=(16, 6), dpi=500) class TopicEvolution(object): def __init__(self, data_path, doc_topic_matrix_path=None, topic_word_csv_path=None): self.data_path = data_path self.topic_word_csv_path = topic_word_csv_path self.doc_topic_matrix_path = doc_topic_matrix_path def topic_intensity_evolution(self, start_year, end_year, topic_num, res_dir='res', space=1): df = pd.read_excel(self.data_path) # print(df['年']) doc_topic_matrix = np.loadtxt(self.doc_topic_matrix_path.format(topic_num)) # # 柱狀圖 x = [f'topic{num}' for num in range(1, topic_num + 1)] y = doc_topic_matrix.mean(axis=0) print(x, np.mean(y)) self.plot_bar(x=x, y=y, path=f'{res_dir}/{topic_num}/柱狀圖.png') # # # 熱圖 doc_topic_df = pd.DataFrame(data=doc_topic_matrix) doc_topic_df.index = df['年'] topic_intensity_df = pd.DataFrame(columns=list(range(start_year, end_year, space))) for year in range(start_year, end_year, space): topic_intensity_df[year] = doc_topic_df.loc[year, :].mean() topic_intensity_df.index = [f'Topic {num}' for num in range(1, topic_num + 1)] self.plot_heatmap(data=topic_intensity_df, cmap='Reds', xlabel='年份', ylabel='主題', path=f'{res_dir}/{topic_num}/熱力圖.png') x = [int(year) for year in range(start_year, end_year, space)] print(x, topic_intensity_df) topic_intensity_df.to_excel('res/topic_intensity.xlsx') self.plot(x=x, data_list=topic_intensity_df, path=f'{res_dir}/{topic_num}/折線圖.png') @staticmethod def plot(x, data_list, path=None): for index in data_list.index.unique(): y = [num for num in data_list.loc[index, :]] # plt.plot(x, y) plt.plot(x, y, "x-", label=f'主題{index}') plt.savefig(path) # plt.legend(loc='best', labels=[f'主題{num}' for num in range(1, len(data_list.index.unique()+1))]) plt.show() @staticmethod def plot_bar(x, y, path=None): plt.bar(x, y, width=0.5) plt.xticks(range(len(x)), x, rotation=45) plt.axhline(y=np.mean(y), xmin=.05, xmax=.95, ls='--', color='black') plt.savefig(path) plt.show() @staticmethod def plot_heatmap(data, cmap, xlabel, ylabel, path=None): if cmap: sns.heatmap(data, cmap=cmap) else: sns.heatmap(data) plt.xticks(rotation=45) plt.xlabel(xlabel) plt.ylabel(ylabel) # plt.title(name) # 保存圖片 plt.savefig(path) # 顯示圖片 plt.show() def extract_keywords_txt(self): df = pd.read_excel(self.data_file) # data_key = pd.read_csv(f'{data_dir}/data_key.txt', delimiter='\t', encoding='gbk') # df['keywords'] = data_key.ID.apply(self.add_keywords) # df['keywords'] = df.apply(self.add_keywords, axis=1) # df.to_excel(self.data_file) # for year in range(2004, 2019): # print(year) # year_df = pd.DataFrame(columns=['ID']) # year_df['ID'] = df.loc[df['年'] == year, 'keywords'].str.strip().str.replace(' ', '; ') # year_df.reset_index(inplace=True, drop=True) # year_df.to_csv(f'{data_dir}/{year}.txt', sep='\t') with open(self.keywords_txt, 'w', encoding='utf-8') as f: for text in df.keywords: f.write(f'{text}\n') @staticmethod def word_replace(word): return word.replace(' & ', '_____').replace('/', '___').replace(', ', '__').replace(',', '__').replace(' ', '_').replace( '-', '____').replace('(', '______').replace(')', '______') def clac_inter_intimate(self, row, model, keywords): topic_internal_sim_sum = [] for word1 in row: word1 = self.word_replace(word1) if word1 not in keywords: continue for word2 in row: word2 = self.word_replace(word2) if (word2 not in keywords) or (word1 == word2): continue try: topic_internal_sim_sum.append(model.wv.similarity(word1, word2)) except KeyError: continue # print(word1, word2, model.wv.similarity(word1, word2)) return np.mean(topic_internal_sim_sum) def topic_intimate(self, model, topic_num=None): df = pd.read_csv(self.topic_word_csv_path, index_col=0) with open('data/vocab.txt', encoding='utf-8') as f: keywords = {word.strip() for word in f if word} topic_inter_intimate = np.mean(df.apply(self.clac_inter_intimate, axis=1, args=(model, keywords))) topic_exter_sim_sum = [] for row1 in df.values.tolist(): for row2 in df.values.tolist(): if row1 == row2: continue topic_exter_sim = [] for word1 in row1: word1 = self.word_replace(word1) if word1 not in keywords: continue for word2 in row2: word2 = self.word_replace(word2) if word2 not in keywords: continue try: topic_exter_sim.append(model.wv.similarity(word1, word2)) except KeyError as e: continue topic_exter_sim_sum.append(np.mean(topic_exter_sim)) # 主題間親密度 topic_exter_intimate = np.mean(topic_exter_sim_sum) # 主題親密度 = (主題內親密度 - 主題間親密度) / 主題內親密度 topic_proximity = (topic_inter_intimate - topic_exter_intimate) / topic_inter_intimate print(topic_num, topic_inter_intimate, topic_exter_intimate, topic_proximity) return topic_num, topic_proximity def file_rename(dir_path, start, end): for num in range(start, end): os.rename(f'res/2004-2018/{dir_path}/{num}/文檔-主題.csv', f'res/2004-2018/{dir_path}/{num}/doc-topic.csv') # os.rename(f'res/2004-2018/{dir_path}/{num}/主題-詞-30.csv', f'res/2004-2018/{dir_path}/{num}/topic-word-30.csv') def plot_image(data, title, x_ticks, xlabel, ylabel, output_dir=None): """ 畫圖 """ plt.figure(figsize=(12, 6), dpi=180) plt.plot(data.keys(), data.values(), '#007A99') plt.xticks(x_ticks) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.title(title) if output_dir: plt.savefig(f'{output_dir}/{title}.png', bbox_inches='tight', pad_inches=0.1) plt.show() def start_plot(start_year, end_year, data_path, doc_topic_matrix_path, res_dir, topic_num=None, min_topics=None, max_topics=None, space=1): """ 柱狀圖、折線圖、heatmap圖 """ if min_topics and max_topics: for n_topics in range(min_topics, max_topics + 1): topic = TopicEvolution(data_path=data_path, doc_topic_matrix_path=doc_topic_matrix_path.format(n_topics)) topic.topic_intensity_evolution(start_year=start_year, end_year=end_year, topic_num=n_topics, res_dir=res_dir, space=space) elif topic_num: topic = TopicEvolution(data_path=data_path, doc_topic_matrix_path=doc_topic_matrix_path) topic.topic_intensity_evolution(start_year=start_year, end_year=end_year, topic_num=topic_num, res_dir=res_dir, space=space) def start_run(model_path, data_path, topic_word_csv_path, min_topics, max_topics, res_dir=None): """ 主題親密度 """ topic_proximity_dict = {} model = Word2Vec.load(model_path) for n_topics in range(min_topics, max_topics + 1): topic = TopicEvolution(data_path='data/data.xlsx', topic_word_csv_path=topic_word_csv_path.format(n_topics)) proximity = topic.topic_intimate(topic_num=n_topics, model=model) topic_proximity_dict[n_topics] = proximity # plot_image(data=topic_proximity_dict, x_ticks=list(range(start, end+1)), title='topic_proximity', xlabel='topic num', ylabel='proximity', output_dir='res/2004-2018') if __name__ == "__main__": topic = TopicEvolution(data_path='data/data.xlsx') start_plot(min_topics=5, max_topics=30, start_year=1993, end_year=2018, data_path='GLP1.xlsx', doc_topic_matrix_path='res/{}/doc_topic_matrix.txt', res_dir='res', space=5) start_run(model_path='model/word2vec.model', data_path='data/GLP1.xlsx', topic_word_csv_path='res/{}/topic-word-30.csv', min_topics=5, max_topics=6)
經驗分享:我都寫好了,直接拿去用吧!