tensorflow文本分類實戰——卷積神經網絡CNN


首先說明使用的工具和環境:python3.6.8   tensorflow1.14.0   centos7.0(最好用Ubuntu)

  關於環境的搭建只做簡單說明,我這邊是使用pip搭建了python的虛擬環境(virtualenv),並在虛擬環境中安裝tensorflow。詳細步驟可以查看tensorflow的官網

注:本文參考於 基於tensorflow、CNN、清華數據集THUCNews的新浪新聞文本分類

訓練數據

  訓練(train.txt)和測試(test.txt)數據,兩個文件的分類相同為100個分類,其中test.txt每個類下有200條測試數據,train.txt每個類下有1800條訓練數據;數據共有兩列,第一列為標簽信息 第二列為標題,見下圖

百度雲鏈接:https://pan.baidu.com/s/1MZX8SOJ7lerov_UqZhSWeQ
提取碼:9nvj

訓練代碼

閑話少說直接上代碼,支持訓練模型固化,代碼粘貼前都經過了測試,均可復現,並且在相應位置給出了注釋,有任何疑問歡迎留言,不忙就會回復。

###################cnn訓練代碼#################
#coding=utf8
import os
import codecs
import random
import heapq
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.contrib.keras as kr
from collections import Counter                       #簡單的計數器 用於統計字符出現的個數
from sklearn.preprocessing import LabelEncoder        #標簽編碼
from sklearn.metrics import confusion_matrix          #混淆矩陣

labelEncoder = LabelEncoder()                         #標簽編碼
#輸出格式顯示設置
#pd.set_option('display.max_columns', None)           #顯示所有列
pd.set_option('display.max_rows', None)               #顯示所有行
#pd.set_option('max_colwidth', 500)                   #設置value的顯示長度為100,默認為50
np.set_printoptions(threshold=np.inf)                 #設置print輸出完整性
os.environ["CUDA_VISIBLE_DEVICES"] = "1"              #設置只有一個gpu可見

#cnn參數設置 可根據實際情況自行修改
vocab_size = 5000            #詞匯表大小
seq_length = 100             #標題序列長度
embedding_dim = 64           #詞向量維度
num_classes = 100            #類別數(初始值,后邊會根據具體訓練數據類目數量修改)
num_filters = 128            #卷積核數目
kernel_size = 5              #卷積核尺寸
hidden_dim = 128             #全連接層神經元
dropout_keep_prob = 0.5      #dropout保留比例
learning_rate = 1e-3         #學習率
batch_size = 100             #每批訓練大小

with open('train.txt', encoding='utf8') as file:                                #加載訓練數據
    line_list = [k.strip() for k in file.readlines()]                           #提取訓練數據中的一行
    train_label_list = [k.split()[0] for k in line_list]                        #提取標簽數據
    train_content_list = [k.split(maxsplit=1)[1] for k in line_list]            #提取標題數據

def getVocabularyList(content_list, vocabulary_size):
    allContent_str = ''.join(content_list)
    counter = Counter(allContent_str)
    vocabulary_list = [k[0] for k in counter.most_common(vocabulary_size)]
    return vocabulary_list

def makeVocabularyFile(content_list, vocabulary_size):
    vocabulary_list = getVocabularyList(content_list, vocabulary_size)
    with open('vocab_last.txt', 'w', encoding='utf8') as file:
        for vocabulary in vocabulary_list:
            file.write(vocabulary + '\n')

#makeVocabularyFile(train_content_list, 5000)                                              #根據訓練數據集創建新的 單字表

with open('vocab.txt', encoding='utf8') as file:                                           #加載 詞匯表(單字表) 數據
    vocabulary_list = [k.strip() for k in file.readlines()]
word2id_dict = dict([(b ,a) for a,b in enumerate(vocabulary_list)])                        #單字 與 id對照表
content2idList = lambda content : [word2id_dict[word] for word in content if word in word2id_dict]
train_idlist_list = [content2idList(content) for content in train_content_list]            #標題訓練id列表

train_X = kr.preprocessing.sequence.pad_sequences(train_idlist_list, seq_length)           #按照seq_length補全和截斷 訓練數據
train_y = labelEncoder.fit_transform(train_label_list)                                     #train_y   所有的訓練標簽數據 做編碼

num_classes = len(labelEncoder.classes_)                                                   #根據訓練數據重新定義類目數量
#保存類目預測標簽
y_lable = codecs.open('y_lable_last.txt', 'w', 'utf-8')
for label in labelEncoder.classes_:
    str1 = label + '\n'
    y_lable.write(str1)
y_lable.close()
print('參與訓練的類目數量', num_classes)
train_Y = kr.utils.to_categorical(train_y, num_classes)            #標簽的one-hot

#搭建神經網絡 這里使用的是默認的W、b
tf.reset_default_graph()
X_holder = tf.placeholder(tf.int32, [None, seq_length])
Y_holder = tf.placeholder(tf.float32, [None, num_classes])
embedding = tf.get_variable('embedding', [vocab_size, embedding_dim])     #vocab_size*embedding_dim 矩陣形狀 5000*64
embedding_inputs = tf.nn.embedding_lookup(embedding, X_holder)            #batch_size*seq_length*embedding_dim 100*100*64 
conv = tf.layers.conv1d(embedding_inputs, num_filters, kernel_size)       #形狀為batch_size*(100-5+1)*num_filter 64*96*128 這里的100指標題序列長度(seq_length)
max_pooling = tf.reduce_max(conv, reduction_indices=[1])                  #最大值池化 形狀為batch_size*num_filter 100*128
full_connect = tf.layers.dense(max_pooling, hidden_dim)                   #添加全連接層 形狀為batch_size*hidden_dim 100*128
full_connect_dropout = tf.contrib.layers.dropout(full_connect, dropout_keep_prob)    #防止全連接過擬合    
full_connect_activate = tf.nn.relu(full_connect_dropout)                  #全連接激活函數
softmax_before = tf.layers.dense(full_connect_activate, num_classes)      #添加全連接層形狀為batch_size*num_classes 100*100
predict_Y = tf.nn.softmax(softmax_before)                                 #softmax方法給出預測概率值
cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=Y_holder, logits=softmax_before)    #交叉熵作為損失函數
loss = tf.reduce_mean(cross_entropy)                 #反向傳播計算損失值
optimizer = tf.train.AdamOptimizer(learning_rate)    #優化器
train = optimizer.minimize(loss)                     #最小化損失
isCorrect = tf.equal(tf.argmax(Y_holder, 1), tf.argmax(predict_Y, 1))        #預算准確率
accuracy = tf.reduce_mean(tf.cast(isCorrect, tf.float32))
#參數初始化 對於神經網絡模型,重要是其中的W、b這兩個參數
init = tf.global_variables_initializer()
session = tf.Session()
session.run(init)

#測試數據加載、轉換
with open('test.txt', encoding='utf8') as file:
    line_list = [k.strip() for k in file.readlines()]
    test_label_list = [k.split()[0] for k in line_list]
    test_content_list = [k.split(maxsplit=1)[1] for k in line_list]
test_idlist_list = [content2idList(content) for content in test_content_list]
test_X = kr.preprocessing.sequence.pad_sequences(test_idlist_list, seq_length)
test_y = labelEncoder.transform(test_label_list)
test_Y = kr.utils.to_categorical(test_y, num_classes)        #做one-hot

for i in range(10000):                                       #表示模型迭代的次數
    selected_index = random.sample(list(range(len(train_y))), k=batch_size)
    batch_X = train_X[selected_index]
    batch_Y = train_Y[selected_index]
    session.run(train, {X_holder:batch_X, Y_holder:batch_Y})  #模型訓練
    step = i+1
    if step % 100 == 0:
        selected_index = random.sample(list(range(len(test_y))), k=100)    #在測試集中隨機抽取100行進行測試
        batch_X = test_X[selected_index]
        batch_Y = test_Y[selected_index]
        loss_value, accuracy_value = session.run([loss, accuracy], {X_holder:batch_X, Y_holder:batch_Y})
        print('step:%d loss:%.4f accuracy:%.4f' %(step, loss_value, accuracy_value))

#保存模型
saver = tf.train.Saver()
save_path = saver.save(session, 'train_model/fenlei_cnn.ckpt')
print('Save to path:', save_path)

def predictAll(test_X, data_size=100):
    predict_value_list = []
    for i in range(0, len(test_X), data_size):
        selected_X = test_X[i:i+data_size]
        predict_value = session.run(predict_Y, {X_holder:selected_X})
        predict_value_list.extend(predict_value)
    return np.array(predict_value_list)

#預測全部的測試數據
Y = predictAll(test_X)
#可修改Y的提取數量
y = np.argmax(Y, axis=1)
predict_label_list = labelEncoder.inverse_transform(y)

df=pd.DataFrame(confusion_matrix(test_label_list, predict_label_list),
            columns = labelEncoder.classes_,
            index = labelEncoder.classes_ )
#print(df)

###報告表
import numpy as np
from sklearn.metrics import precision_recall_fscore_support

def eval_model(y_true, y_pred, labels):
    #計算每個分類的Precision, Recall, f1, support
    p,r,f1,s = precision_recall_fscore_support(y_true, y_pred)
    #計算總體的平局Precision,recall,f1,support
    tot_p = np.average(p, weights=s)
    tot_r = np.average(r, weights=s)
    tot_f1 = np.average(f1, weights=s)
    tot_s = np.sum(s)
    res1 = pd.DataFrame({
        u'Label':labels,
        u'Precision':p,
        u'Recall':r,
        u'F1':f1,
        u'Support':s
        })
    res2 = pd.DataFrame({
        u'Label':['總體'],
        u'Precision':[tot_p],
        u'Recall':[tot_r],
        u'F1':[tot_f1],
        u'Support':[tot_s]
        })
    res2.index = [999]
    res = pd.concat([res1, res2])
    return res[['Label', 'Precision', 'Recall', 'F1', 'Support']]
tables = eval_model(test_label_list, predict_label_list, labelEncoder.classes_)
print(tables)

預測代碼

python預測代碼,代碼支持批量預測和單條測試

執行方式:python cnn_predict.py debug或python cnn_predict.py batch

#coding=utf8
import tensorflow as tf
import os
import sys
import time
import codecs
import random
import heapq
import numpy as np
import tensorflow.contrib.keras as kr
from sklearn.preprocessing import LabelEncoder                    #標簽編碼


labelEncoder = LabelEncoder()                                    #標簽編碼
os.environ["CUDA_VISIBLE_DEVICES"] = "1"                         #設置只有一個gpu可見
#可根據實際情況自行修改
vocab_size = 5000            #詞匯表大小
seq_length = 100             #序列長度
embedding_dim = 64           #詞向量維度
num_classes = 100            #類別數
num_filters = 128            #卷積核數目
kernel_size = 5              #卷積核尺寸
hidden_dim = 128             #全連接層神經元
dropout_keep_prob = 1        #dropout保留比例    注意:這里要保留為1 與訓練參數的差距
learning_rate = 1e-3         #學習率
batch_size = 100             #每批訓練大小

np.set_printoptions(threshold=np.inf)                                   #設置print輸出完整性

with open('vocab.txt', encoding='utf8') as file:                        #加載 詞匯表(單字表) 數據
    vocabulary_list = [k.strip() for k in file.readlines()]
word2id_dict = dict([(b ,a) for a,b in enumerate(vocabulary_list)])     #單字 與 id對照表
content2idList = lambda content : [word2id_dict[word] for word in content if word in word2id_dict]

with open('y_lable.txt', encoding='utf8') as file:
    train_label_list = [k.strip() for k in file.readlines()]
labelEncoder.fit_transform(train_label_list)                             #所有的訓練標簽數據 做編碼
#搭建神經網絡
tf.reset_default_graph()
X_holder = tf.placeholder(tf.int32, [None, seq_length])
Y_holder = tf.placeholder(tf.float32, [None, num_classes])
embedding = tf.get_variable('embedding', [vocab_size, embedding_dim])       #vocab_size*embedding_dim 矩陣形狀 5000*64
embedding_inputs = tf.nn.embedding_lookup(embedding, X_holder)              #batch_size*seq_length*embedding_dim 100*100*64 
conv = tf.layers.conv1d(embedding_inputs, num_filters, kernel_size)         #形狀為batch_size*(100-5+1)*num_filter 64*96*128
max_pooling = tf.reduce_max(conv, reduction_indices=[1])                    #最大值池化 形狀為batch_size*num_filter 64*128
full_connect = tf.layers.dense(max_pooling, hidden_dim)                     #添加全連接層 形狀為batch_size*hidden_dim 64*128
full_connect_dropout = tf.contrib.layers.dropout(full_connect, dropout_keep_prob)    #防止全連接過擬合    
full_connect_activate = tf.nn.relu(full_connect_dropout)                    #全連接激活函數
softmax_before = tf.layers.dense(full_connect_activate, num_classes)        #添加全連接層形狀為batch_size*num_classes 100*100
predict_Y = tf.nn.softmax(softmax_before)                                   #softmax方法給出預測概率值
cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=Y_holder, logits=softmax_before)    #交叉熵作為損失函數
loss = tf.reduce_mean(cross_entropy)                 #反向傳播計算損失值
optimizer = tf.train.AdamOptimizer(learning_rate)    #優化器
train = optimizer.minimize(loss)                     #最小化損失
isCorrect = tf.equal(tf.argmax(Y_holder, 1), tf.argmax(predict_Y, 1))        #預算准確率
accuracy = tf.reduce_mean(tf.cast(isCorrect, tf.float32))

session = tf.Session()
#加載預測模型
saver = tf.train.Saver()
saver.restore(session, 'train_model/fenlei_cnn.ckpt')
print('load model succesful')

def predictAll(test_X, data_size=100):
    predict_value_list = []
    for i in range(0, len(test_X), data_size):
        selected_X = test_X[i:i+data_size]
        predict_value = session.run(predict_Y, {X_holder:selected_X})
        predict_value_list.extend(predict_value)
    return np.array(predict_value_list)

#給出五個預測結果,及預測分數,測試准確率
def format_predict5(Y):    
    y_index = []
    y_value = []
    for Y_l in Y:
        index_l = heapq.nlargest(5, range(len(Y_l)), Y_l.take)        #獲取前五個下標
        value_l = heapq.nlargest(5, Y_l)                              #獲取前五個類目數值
        y_index.append(index_l)
        y_value.append(value_l)
        #print('前五個類目得分:',value_l)
    for i in range(0,len(test_id_list)):
        num=0
        flag=0
        #判斷預測類目閾值
        for n in range(5):
            if test_label_list[i] in train_label_list and  y_value[i][n] > 0.1:
                num+=1
            elif y_value[i][n] > 0.9:            #如果原類目沒有訓練 則閾值要大於0.9
                num+=1
                flag=1
        if num == 0:
            continue
        #把名稱index轉換成name
        pre_Yname = []
        for ii in range(num):
            pre_Yname.append(labelEncoder.classes_[y_index[i][ii]])
        #判斷源目錄與預測類目相同
        if test_label_list[i] in pre_Yname:
            continue        
        
        str1 = test_id_list[i]+'\t'+test_content_list[i]+'\t'+test_label_list[i]+'\t'+ str(flag)
        for j in range(num):
            str1 += '\t'+pre_Yname[j]+'('+str(y_value[i][j]) +')'
        str1+='\n'
        fo.write(str1)

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print ('python cnn_predict.py batch(debug)')
        exit(1)
    if sys.argv[1] == 'batch':
        _time = time.strftime("%m%d%H%M", time.localtime())
        fo = codecs.open('test_result'+_time+'.txt', 'w', 'utf8')
        batch_line = []                        #批處理檢測 滿足1000條進行一次批處理
        b_size = 0
        with codecs.open('test.txt', 'rb', 'utf8', 'ignore') as file:
            for line in file:
                datalist1 = line.strip().split('\t')
                if len(datalist1) != 3: # or datalist[1] not in train_label_list:
                    continue
                batch_line.append(line.strip())
                b_size+=1
                if b_size == 1000:
                    test_id_list = [k.split('\t')[0] for k in batch_line]
                    test_label_list = [k.split('\t')[2] for k in batch_line]
                    test_content_list = [k.split('\t')[1] for k in batch_line]
                    test_idlist_list = [content2idList(content) for content in test_content_list]
                    test_X = kr.preprocessing.sequence.pad_sequences(test_idlist_list, seq_length)

                    #預測
                    Y = predictAll(test_X)
                    format_predict5(Y)
                    batch_line.clear()
                    b_size = 0
            fo.close()
    elif sys.argv[1] == 'debug':
        while(1):
            title = input("title:")
            if not title.strip():
                continue
            title_idlist_list = [content2idList(title.strip())]
            test_X = kr.preprocessing.sequence.pad_sequences(title_idlist_list, seq_length)
            selected_X = test_X[0:10]
            predict_value = session.run(predict_Y, {X_holder:selected_X})
            index_l = heapq.nlargest(10, range(len(predict_value[0])), predict_value[0].take)        #獲取前十個數下標
            value_l = heapq.nlargest(10, predict_value[0])
            for i in range(10):
                line = '  '+str(i+1)+'. '+labelEncoder.classes_[index_l[i]]+'('+str(value_l[i]) +')'
                print (line)
            

 最后貼幾張結果:

訓練(上圖)

准確率 召回率 f1(上圖)

 debug單條給出十個預測結果(上圖)

分類代碼以字為單位,沒有進行分詞,兩部分代碼可單獨運行,100個類的測試集平均准確率可達到95%,有問題歡迎留言。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM