1、目录结构
2、入口类
# coding = utf-8 """ 注意:RNN使用的数据为序列化的数据 RNN网络:主要由多个LSTM计算单元组成,依靠BPTT算法进行时序展开 LSTM:含有保留门和忘记门,是一个多输入多输出的网络结构。 LSTM具备抑制梯度特性 """ # import numpy as np # import tensorflow as tf # from .models.model import rnn_model # from .dataset.poems import process_poems,generate_batch import argparse import sys sys.path.append(r'D:\study\python-数据分析\深度学习\RNN网络\inference') def parse_args(): """ 参数设定 :return: """ #参数描述 parser = argparse.ArgumentParser(description='Intelligence Poem and Lyric Writer.') help_ = 'you can set this value in terminal --write value can be poem or lyric.' parser.add_argument('-w', '--write', default='poem', choices=['poem', 'lyric'], help=help_) help_ = 'choose to train or generate.' #训练 parser.add_argument('--train', dest='train', action='store_true', help=help_) #测试 parser.add_argument('--no-train', dest='train', action='store_false', help=help_) parser.set_defaults(train=False) args_ = parser.parse_args() return args_ if __name__ == '__main__': args = parse_args() if args.write == 'poem': from inference import tang_poems if args.train: tang_poems.main(True) #训练 else: tang_poems.main(False) #测试 elif args.write == 'lyric': from inference import song_lyrics print(args.train) if args.train: song_lyrics.main(True) else: song_lyrics.main(False) else: print('[INFO] write option can only be poem or lyric right now.')
3、tang_poems.py

# -*- coding: utf-8 -*- # file: tang_poems.py import collections import os import sys import numpy as np import tensorflow as tf from models.model import rnn_model from dataset.poems import process_poems, generate_batch import heapq tf.app.flags.DEFINE_integer('batch_size', 64, 'batch size.') tf.app.flags.DEFINE_float('learning_rate', 0.01, 'learning rate.') # set this to 'main.py' relative path tf.app.flags.DEFINE_string('checkpoints_dir', os.path.abspath('./checkpoints/poems/'), 'checkpoints save path.') tf.app.flags.DEFINE_string('file_path', os.path.abspath('./dataset/data/poems.txt'), 'file name of poems.') tf.app.flags.DEFINE_string('model_prefix', 'poems', 'model save prefix.') tf.app.flags.DEFINE_integer('epochs', 50, 'train how many epochs.') FLAGS = tf.app.flags.FLAGS start_token = 'G' end_token = 'E' def run_training(): #模型保存路径配置 if not os.path.exists(os.path.dirname(FLAGS.checkpoints_dir)): os.mkdir(os.path.dirname(FLAGS.checkpoints_dir)) if not os.path.exists(FLAGS.checkpoints_dir): os.mkdir(FLAGS.checkpoints_dir) #1、诗集数据处理 poems_vector, word_to_int, vocabularies = process_poems(FLAGS.file_path) #2、生成批量数据用于训练 batches_inputs, batches_outputs = generate_batch(FLAGS.batch_size, poems_vector, word_to_int) input_data = tf.placeholder(tf.int32, [FLAGS.batch_size, None]) output_targets = tf.placeholder(tf.int32, [FLAGS.batch_size, None]) #3、建立模型 end_points = rnn_model(model='lstm', input_data=input_data, output_data=output_targets, vocab_size=len( vocabularies), rnn_size=128, num_layers=2, batch_size=64, learning_rate=FLAGS.learning_rate) saver = tf.train.Saver(tf.global_variables()) init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) #4、开始训练 with tf.Session() as sess: # sess = tf_debug.LocalCLIDebugWrapperSession(sess=sess) # sess.add_tensor_filter("has_inf_or_nan", tf_debug.has_inf_or_nan) sess.run(init_op) start_epoch = 0 checkpoint = tf.train.latest_checkpoint(FLAGS.checkpoints_dir) if checkpoint: saver.restore(sess, checkpoint) print("[INFO] restore from the checkpoint {0}".format(checkpoint)) start_epoch += int(checkpoint.split('-')[-1]) print('[INFO] start training...') try: for epoch in range(start_epoch, FLAGS.epochs): n = 0 n_chunk = len(poems_vector) // FLAGS.batch_size for batch in range(n_chunk): loss, _, _ = sess.run([ end_points['total_loss'], end_points['last_state'], end_points['train_op'] ], feed_dict={input_data: batches_inputs[n], output_targets: batches_outputs[n]}) n += 1 print('[INFO] Epoch: %d , batch: %d , training loss: %.6f' % (epoch, batch, loss)) if epoch % 6 == 0: saver.save(sess, './model/', global_step=epoch) #saver.save(sess, os.path.join(FLAGS.checkpoints_dir, FLAGS.model_prefix), global_step=epoch) except KeyboardInterrupt: print('[INFO] Interrupt manually, try saving checkpoint for now...') saver.save(sess, os.path.join(FLAGS.checkpoints_dir, FLAGS.model_prefix), global_step=epoch) print('[INFO] Last epoch were saved, next time will start from epoch {}.'.format(epoch)) def to_word(predict, vocabs): t = np.cumsum(predict) s = np.sum(predict) sample = int(np.searchsorted(t, np.random.rand(1) * s)) if sample > len(vocabs): sample = len(vocabs) - 1 return vocabs[sample] def gen_poem(begin_word): batch_size = 1 print('[INFO] loading corpus from %s' % FLAGS.file_path) poems_vector, word_int_map, vocabularies = process_poems(FLAGS.file_path) input_data = tf.placeholder(tf.int32, [batch_size, None]) end_points = rnn_model(model='lstm', input_data=input_data, output_data=None, vocab_size=len( vocabularies), rnn_size=128, num_layers=2, batch_size=64, learning_rate=FLAGS.learning_rate) saver = tf.train.Saver(tf.global_variables()) init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) with tf.Session() as sess: sess.run(init_op) #checkpoint = tf.train.latest_checkpoint(FLAGS.checkpoints_dir) checkpoint = tf.train.latest_checkpoint('./model/') #saver.restore(sess, checkpoint) saver.restore(sess, './model/-24') x = np.array([list(map(word_int_map.get, start_token))]) [predict, last_state] = sess.run([end_points['prediction'], end_points['last_state']], feed_dict={input_data: x}) if begin_word: word = begin_word else: word = to_word(predict, vocabularies) poem = '' while word != end_token: print ('runing') poem += word x = np.zeros((1, 1)) x[0, 0] = word_int_map[word] [predict, last_state] = sess.run([end_points['prediction'], end_points['last_state']], feed_dict={input_data: x, end_points['initial_state']: last_state}) word = to_word(predict, vocabularies) # word = words[np.argmax(probs_)] return poem def pretty_print_poem(poem): poem_sentences = poem.split('。') for s in poem_sentences: if s != '' and len(s) > 10: print(s + '。') def main(is_train): if is_train: print('[INFO] train tang poem...') run_training() else: print('[INFO] write tang poem...') begin_word = input('输入起始字:') #begin_word = '我' poem2 = gen_poem(begin_word) pretty_print_poem(poem2) if __name__ == '__main__': tf.app.run()
4、inference中poems.py

import numpy as np import tensorflow as tf from models.model import rnn_model from dataset.poems import process_poems,generate_batch tf.app.flags.DEFINE_integer('batch_size',64,'batch size = ?') tf.app.flags.DEFINE_float('learning_rate',0.01,'learning_rate') tf.app.flags.DEFINE_string('check_pointss_dir','./model/','check_pointss_dir') tf.app.flags.DEFINE_string('file_path','./data/.txt','file_path') tf.app.flags.DEFINE_integer('epoch',50,'train epoch') start_token = 'G' end_token = 'E' FLAGS = tf.app.flags.FLAGS def run_training(): poems_vector,word_to_int,vocabularies = process_poems(FLAGS.file_path) batch_inputs,batch_outputs = generate_batch(FLAGS.batch_size,poems_vector,word_to_int) input_data = tf.placeholder(tf.int32, [FLAGS.batch_size,None]) output_targets = tf.placeholder(tf.int32, [FLAGS.batch_size,None]) end_points = rnn_model(model='lstm',input=input_data,output_data = output_targets,vocab_size = len(vocabularies) ,run_size = 128,num_layers = 2,batch_size = 64,learning_rate = 0.01) def main(is_train): if is_train: print ('training') run_training() else: print ('test') begin_word = input('word') if __name__ == '__main__': tf.app.run()
5、model.py

# -*- coding: utf-8 -*- # file: model.py import tensorflow as tf import numpy as np def rnn_model(model, input_data, output_data, vocab_size, rnn_size=128, num_layers=2, batch_size=64, learning_rate=0.01): """ construct rnn seq2seq model. :param model: model class :param input_data: input data placeholder :param output_data: output data placeholder :param vocab_size: :param rnn_size: :param num_layers: :param batch_size: :param learning_rate: :return: """ end_points = {} #1、选择网络 if model == 'rnn': cell_fun = tf.contrib.rnn.BasicRNNCell #RNN API elif model == 'gru': cell_fun = tf.contrib.rnn.GRUCell elif model == 'lstm': cell_fun = tf.contrib.rnn.BasicLSTMCell cell = cell_fun(rnn_size, state_is_tuple=True) cell = tf.contrib.rnn.MultiRNNCell([cell] * num_layers, state_is_tuple=True) #lstm api #2、lstm 状态初始化 if output_data is not None: initial_state = cell.zero_state(batch_size, tf.float32) else: initial_state = cell.zero_state(1, tf.float32) #3、使用cpu运算 with tf.device("/cpu:0"): embedding = tf.get_variable('embedding', initializer=tf.random_uniform( [vocab_size + 1, rnn_size], -1.0, 1.0)) inputs = tf.nn.embedding_lookup(embedding, input_data) # [batch_size, ?, rnn_size] = [64, ?, 128] outputs, last_state = tf.nn.dynamic_rnn(cell, inputs, initial_state=initial_state) output = tf.reshape(outputs, [-1, rnn_size]) #4、模型建立 weights = tf.Variable(tf.truncated_normal([rnn_size, vocab_size + 1])) bias = tf.Variable(tf.zeros(shape=[vocab_size + 1])) logits = tf.nn.bias_add(tf.matmul(output, weights), bias=bias) # [?, vocab_size+1] #5、损失以及优化 if output_data is not None: # output_data must be one-hot encode labels = tf.one_hot(tf.reshape(output_data, [-1]), depth=vocab_size + 1) # should be [?, vocab_size+1] loss = tf.nn.softmax_cross_entropy_with_logits(labels=labels, logits=logits) # loss shape should be [?, vocab_size+1] total_loss = tf.reduce_mean(loss) train_op = tf.train.AdamOptimizer(learning_rate).minimize(total_loss) end_points['initial_state'] = initial_state end_points['output'] = output end_points['train_op'] = train_op end_points['total_loss'] = total_loss end_points['loss'] = loss end_points['last_state'] = last_state else: prediction = tf.nn.softmax(logits) end_points['initial_state'] = initial_state end_points['last_state'] = last_state end_points['prediction'] = prediction return end_points
6、dataset中poems.py

# -*- coding: utf-8 -*- # file: poems.py import collections import os import sys import numpy as np start_token = 'G' end_token = 'E' def process_poems(file_name): """ 诗数据处理, :param file_name: 文件名 :return: """ # 诗集 poems = [] with open(file_name, "r", encoding='utf-8', ) as f: for line in f.readlines(): try: title, content = line.strip().split(':') content = content.replace(' ', '') #过滤不符合的诗,或者脏数据 if '_' in content or '(' in content or '(' in content or '《' in content or '[' in content or \ start_token in content or end_token in content: continue if len(content) < 5 or len(content) > 79: continue content = start_token + content + end_token poems.append(content) except ValueError as e: pass # 按诗的字数排序 poems = sorted(poems, key=lambda l: len(line)) # 统计每个字出现次数 all_words = [] for poem in poems: all_words += [word for word in poem] # 这里根据包含了每个字对应的频率 counter = collections.Counter(all_words) count_pairs = sorted(counter.items(), key=lambda x: -x[1]) words, _ = zip(*count_pairs) # 取前多少个常用字 words = words[:len(words)] + (' ',) # 每个字映射为一个数字ID word_int_map = dict(zip(words, range(len(words)))) poems_vector = [list(map(lambda word: word_int_map.get(word, len(words)), poem)) for poem in poems] return poems_vector, word_int_map, words def generate_batch(batch_size, poems_vec, word_to_int): # 每次取64首诗进行训练 n_chunk = len(poems_vec) // batch_size x_batches = [] y_batches = [] for i in range(n_chunk): start_index = i * batch_size end_index = start_index + batch_size batches = poems_vec[start_index:end_index] # 找到这个batch的所有poem中最长的poem的长度 length = max(map(len, batches)) # 填充一个这么大小的空batch,空的地方放空格对应的index标号 x_data = np.full((batch_size, length), word_to_int[' '], np.int32) for row in range(batch_size): # 每一行就是一首诗,在原本的长度上把诗还原上去 x_data[row, :len(batches[row])] = batches[row] y_data = np.copy(x_data) # y的话就是x向左边也就是前面移动一个 y_data[:, :-1] = x_data[:, 1:] """ x_data y_data [6,2,4,6,9] [2,4,6,9,9] [1,4,2,8,5] [4,2,8,5,5] """ x_batches.append(x_data) y_batches.append(y_data) return x_batches, y_batches
7、clean_cn.py

# -*- coding: utf-8 -*- # file: clean_cn.py """ this script using for clean Chinese corpus. you can set level for clean, i.e.: level='all', will clean all character that not Chinese, include punctuations level='normal', this will generate corpus like normal use, reserve alphabets and numbers level='clean', this will remove all except Chinese and Chinese punctuations besides, if you want remove complex Chinese characters, just set this to be true: simple_only=True """ import numpy as np import os import string cn_punctuation_set = [',', '。', '!', '?', '"', '"', '、'] en_punctuation_set = [',', '.', '?', '!', '"', '"'] def clean_cn_corpus(file_name, clean_level='all', simple_only=True, is_save=True): """ clean Chinese corpus. :param file_name: :param clean_level: :param simple_only: :param is_save: :return: clean corpus in list type. """ if os.path.dirname(file_name): base_dir = os.path.dirname(file_name) else: print('not set dir. please check') save_file = os.path.join(base_dir, os.path.basename(file_name).split('.')[0] + '_cleaned.txt') with open(file_name, 'r+') as f: clean_content = [] for l in f.readlines(): l = l.strip() if l == '': pass else: l = list(l) should_remove_words = [] for w in l: if not should_reserve(w, clean_level): should_remove_words.append(w) clean_line = [c for c in l if c not in should_remove_words] clean_line = ''.join(clean_line) if clean_line != '': clean_content.append(clean_line) if is_save: with open(save_file, 'w+') as f: for l in clean_content: f.write(l + '\n') print('[INFO] cleaned file have been saved to %s.' % save_file) return clean_content def should_reserve(w, clean_level): if w == ' ': return True else: if clean_level == 'all': # only reserve Chinese characters if w in cn_punctuation_set or w in string.punctuation or is_alphabet(w): return False else: return is_chinese(w) elif clean_level == 'normal': # reserve Chinese characters, English alphabet, number if is_chinese(w) or is_alphabet(w) or is_number(w): return True elif w in cn_punctuation_set or w in en_punctuation_set: return True else: return False elif clean_level == 'clean': if is_chinese(w): return True elif w in cn_punctuation_set: return True else: return False else: raise "clean_level not support %s, please set for all, normal, clean" % clean_level def is_chinese(uchar): """is chinese""" if u'\u4e00' <= uchar <= u'\u9fa5': return True else: return False def is_number(uchar): """is number""" if u'\u0030' <= uchar <= u'\u0039': return True else: return False def is_alphabet(uchar): """is alphabet""" if (u'\u0041' <= uchar <= u'\u005a') or (u'\u0061' <= uchar <= u'\u007a'): return True else: return False def semi_angle_to_sbc(uchar): """半角转全角""" inside_code = ord(uchar) if inside_code < 0x0020 or inside_code > 0x7e: return uchar if inside_code == 0x0020: inside_code = 0x3000 else: inside_code += 0xfee0 return chr(inside_code) def sbc_to_semi_angle(uchar): """全角转半角""" inside_code = ord(uchar) if inside_code == 0x3000: inside_code = 0x0020 else: inside_code -= 0xfee0 if inside_code < 0x0020 or inside_code > 0x7e: return uchar return chr(inside_code)