GAN網絡,利用gan網絡完成對一維數據點的生成


代碼:

import argparse
import numpy as np
from scipy.stats import norm
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib import animation
import seaborn as sns
sns.set(color_codes=True)

seed = 42
np.random.seed(seed)
tf.set_random_seed(seed)


class DataDistribution(object): # 真實數據
def __init__(self):
self.mu = 4
self.sigma = 0.5
def sample(self, N):
samples = np.random.normal(self.mu, self.sigma, N)
samples.sort()
return samples

class GeneratorDistibution(object): # 隨機噪音點,初始化輸入
def __init__(self, range):
self.range = range
def sample(self, N):
return np.linspace(-self.range, self.range, N) + np.random.normal(N) * 0.01

def linear(input, output_dim, scope=None, stddev=1.0): # 單網絡層
norm = tf.random_normal_initializer(stddev=stddev)
const = tf.constant_initializer(0.0)
with tf.variable_scope(scope or 'linear'): # 初始化 w, b參數
w = tf.get_variable('w', [input.get_shape()[1], output_dim], initializer=norm)
b = tf.get_variable('b', [output_dim], initializer=const)
return tf.matmul(input, w) + b

def generator(input, h_dim):
h0 = tf.nn.softplus(linear(input, h_dim, 'g0'))
h1 = linear(h0, 1, 'g1')
return h1

def discriminator(input, h_dim): # 預訓練判別D網絡
h0 = tf.tanh(linear(input, h_dim * 2, 'd0'))
h1 = tf.tanh(linear(h0, h_dim * 2, 'd1'))
h2 = tf.tanh(linear(h1, h_dim * 2, scope='d2'))

h3 = tf.sigmoid(linear(h2, 1, scope='d3')) # 輸出結果 0/1
return h3

def optimizer(loss, var_list, initial_learning_rate): # 學習率不斷衰減的學習策略
decay = 0.95
num_deacy_steps = 150 # 每迭代150次進行一次學習率衰減
batch = tf.Variable(0)
learning_rate = tf.train.exponential_decay(
initial_learning_rate,
batch,
num_deacy_steps,
decay,
staircase=True
)
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(
loss,
global_step=batch,
var_list=var_list
)
return optimizer

class GAN(object): # 構造模型 G生成網絡是希望生成的值(本質上是Falee)被判別網絡認定為True D判別網絡是希望分清輸入的是True還是False
def __init__(self, data, gen, num_steps, batch_size, log_every):
self.data = data
self.gen = gen
self.num_steps = num_steps
self.batch_size = batch_size
self.log_every = log_every
self.mlp_hidden_size = 4 # 隱層神經元個數
self.learning_rate = 0.03
self._create_model()
def _create_model(self):

with tf.variable_scope('D_pre'): # 作用域 預訓練判別D網絡 作用:訓練該網絡是希望可以拿出一組還不錯的參數來初始化真正的判別網絡
self.pre_input = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
self.pre_labels = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
D_pre = discriminator(self.pre_input, self.mlp_hidden_size) # 預訓練判別D網絡
self.pre_loss = tf.reduce_mean(tf.square(D_pre - self.pre_labels)) # 損失函數
self.pre_opt = optimizer(self.pre_loss, None, self.learning_rate)


with tf.variable_scope('Gen'): # G網絡,用來生成模仿的值 x-->G(x)
self.z = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
self.G = generator(self.z, self.mlp_hidden_size)

with tf.variable_scope('Disc') as scope: # D網絡 判別網絡
self.x = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
self.D1 = discriminator(self.x, self.mlp_hidden_size) # 真實數據輸入得到網絡輸出
scope.reuse_variables() # 使用相同的網絡參數
self.D2 = discriminator(self.G, self.mlp_hidden_size) # 生成網絡輸入得到網絡輸出

# 定義GAN網絡的損失函數
self.loss_d = tf.reduce_mean(-tf.log(self.D1) - tf.log(1-self.D2)) # 希望D1 --> 1 D2 --> 0
self.loss_g = tf.reduce_mean(-tf.log(self.D2)) # 希望D2 --> 1

# 初始化參數
self.d_pre_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='D_pre')
self.d_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='Disc')
self.g_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='Gen')

self.opt_d = optimizer(self.loss_d, self.d_params, self.learning_rate)
self.opt_g = optimizer(self.loss_g, self.g_params, self.learning_rate)

# 訓練模型
def train(self):
with tf.Session() as session:
tf.global_variables_initializer().run() # 初始化全局變量

num_pretrain_steps = 1000
for step in range(num_pretrain_steps): # 先訓練D_pre網絡
d = (np.random.random(self.batch_size) - 0.5) * 10.0 # 隨機初始化
labels = norm.pdf(d, loc=self.data.mu, scale=self.data.sigma) # 根據d得到高斯值的生成
pretrain_loss, _ = session.run([self.pre_loss, self.pre_opt], {
self.pre_input: np.reshape(d, (self.batch_size, 1)),
self.pre_labels: np.reshape(labels, (self.batch_size, 1))
})
self.weightsD = session.run(self.d_pre_params) # 得到D_pre網絡的參數

# 對D網絡進行參數初始化
for i, v in enumerate(self.d_params):
session.run(v.assign(self.weightsD[i]))

# 訓練對抗神經網絡
for step in range(self.num_steps):
x = self.data.sample(self.batch_size)
z = self.gen.sample(self.batch_size)
loss_d, _ = session.run([self.loss_d, self.opt_d], {
self.x: np.reshape(x, (self.batch_size, 1)),
self.z: np.reshape(z, (self.batch_size, 1))
})

z = self.gen.sample(self.batch_size)
loss_g, _ = session.run([self.loss_g, self.opt_g], {
self.z: np.reshape(z, (self.batch_size, 1))
})

if step % self.log_every == 0:
print('{}; loss_d:{},\tloss_g:{}'.format(step, loss_d, loss_g)) # 打印loss信息
if step % 100 == 0 or step == 0 or step == self.num_steps - 1:
self._plot_distribitions(session)

def _samples(self, session, num_points=10000, num_bins=100):
xs = np.linspace(-self.gen.range, self.gen.range, num_points)
bins = np.linspace(-self.gen.range, self.gen.range, num_bins)

d = self.data.sample(num_points)
pd, _ = np.histogram(d, bins=bins, density=True)

zs = np.linspace(-self.gen.range, self.gen.range, num_points)
g = np.zeros((num_points, 1))
for i in range(num_points // self.batch_size):
g[self.batch_size * i : self.batch_size * (i+1)] = session.run(self.G, {
self.z: np.reshape(
zs[self.batch_size * i: self.batch_size * (i+1)],
(self.batch_size, 1)
)
})
pg, _ = np.histogram(g, bins=bins, density=True)
return pd, pg

def _plot_distribitions(self, session):
pd, pg = self._samples(session)
p_x = np.linspace(-self.gen.range, self.gen.range,len(pd))
f, ax = plt.subplots(1)
ax.set_ylim(0, 1)
plt.plot(p_x, pd, label='real data')
plt.plot(p_x, pg, label='generated data')
plt.xlabel('Data values')
plt.ylabel('probility density')
plt.legend()
plt.show()

def main(args):
model = GAN(
DataDistribution(),
GeneratorDistibution(range=8),
args.num_steps, # 迭代次數
args.batch_size, # 一次迭代數據量
args.log_every, # 間隔多少次輸出loss信息
)
model.train()

def parse_args(): # 參數
parser = argparse.ArgumentParser()
parser.add_argument('--num-steps', type=int, default=1200)
parser.add_argument('--batch-size', type=int, default=12)
parser.add_argument('--log-every', type=int, default=10)
return parser.parse_args()

if __name__ == '__main__':
main(parse_args())


輸出結果:

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM