總結回顧一下近期學習的RL算法,並給部分實現算法整理了流程圖、貼了代碼。
1. value-based 基於價值的算法
基於價值算法是通過對agent所屬的environment的狀態或者狀態動作對進行評分。對於已經訓練好的模型,agent只需要根據價值函數對當前狀態選擇評分最高的動作即可;對於正在訓練的模型,我們通常將目標值(真實行動帶來的反饋)和價值函數的預測值的差距作為loss訓練價值函數。
通常使用兩種價值函數:
-
狀態價值函數 V(s),策略為 π 的狀態-值函數,即狀態s下預計累計回報的期望值,滿足:
-
狀態行動價值函數 Q(s, a),策略為 π 的狀態-動作值函數,即狀態s下采取行動a預計累計回報的期望值,滿足:
value-based通常和貪婪策略一起使用,網絡輸出動作的價值后選擇最大價值的方法,當最優策略是隨機策略(比如環境是剪刀石頭布)時,往往效果不佳;同時由於需要輸出獎勵使得無法將動作映射到一個分布中,因此對於連續動作無能為力。
1.1 Q-learning
最經典的value-based算法,通過Q-learning可以很好地體驗到基於價值方法的優缺點。使用Q table作為價值函數Q(s, a)的載體,算法模型如下:
Agent代碼如下:
"""
Q-learning
"""
class Agent:
def __init__(self, actions, learning_rate, reward_decay, e_greedy):
self.actions = actions
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon = e_greedy
self.q_table = pd.DataFrame(
columns=self.actions,
dtype=np.float
)
def choose_action(self, observation):
self.check_state_exist(observation)
if np.random.uniform() < self.epsilon:
state_actions = self.q_table.loc[observation, :]
action = np.random.choice(state_actions[state_actions == np.max(state_actions)].index)
else:
action = np.random.choice(self.actions)
return action
def learn(self, s, a, r, s_):
self.check_state_exist(s_)
q_predict = self.q_table.loc[s, a]
if s_ != 'end':
q_target = r + self.gamma * self.q_table.loc[s_, :].max()
else:
q_target = r
self.q_table.loc[s, a] += self.lr * (q_target - q_predict)
def check_state_exist(self, state):
if state not in self.q_table.index:
self.q_table = self.q_table.append(
pd.Series(
[0] * len(self.actions),
index=self.q_table.columns,
name=state
)
)
1.2 Sarsa
Sarsa大體於Q learning類似,不過在流程上在計算loss前先選擇了下一步的動作next_action,然后再進行loss計算,這使得Sarsa學習的狀態動作對都屬於當前的軌跡,屬於在線學習on-policy。
Q learning和Sarsa在測試時,效果相比起來:
- Q learning更強調該狀態和目標點的距離遠近,若距離較近,maxa'Q(s', a')則值很大,導致這個狀態的所有動作的值都偏大;
- Sarsa更強調狀態動作對,如果狀態離目標點更近還不行,只有可以或者已經到達目標點的路徑的值才會偏大。
所以Q learning的路徑會比較激進,偏向於探索,因為如果該狀態其中一個動作值較高,其他動作值也相應較高,而sarsa則比較保守,如果該狀態其中一個動作值較高,會傾向於走那條值較高的道路,探索性較低。
優化版本Sarsa Lambda一次到達后同時更新多步,但是也更加保守了,因為多個狀態動作對受影響。
Agent代碼如下:
"""
Sarsa Lambda
"""
class Agent:
def __init__(self, actions, learning_rate, reward_decay, e_greedy, sarsa_lambda):
self.actions = actions
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon = e_greedy
self.q_table = pd.DataFrame(
columns=self.actions,
dtype=np.float
)
self.lambda_ = sarsa_lambda
self.eligibility_trace = self.q_table.copy()
def choose_action(self, observation):
self.check_state_exist(observation)
if np.random.uniform() < self.epsilon:
state_actions = self.q_table.loc[observation, :]
action = np.random.choice(state_actions[state_actions == np.max(state_actions)].index)
else:
action = np.random.choice(self.actions)
return action
def learn(self, s, a, r, s_, a_):
self.check_state_exist(s_)
q_predict = self.q_table.loc[s, a]
if s_ != 'end':
q_target = r + self.gamma * self.q_table.loc[s_, a_]
else:
q_target = r
error = q_target - q_predict
self.eligibility_trace.loc[s, :] *= 0
self.eligibility_trace.loc[s, a] = 1
self.q_table += self.lr * error * self.eligibility_trace
self.eligibility_trace *= self.gamma * self.lambda_
def check_state_exist(self, state):
if state not in self.q_table.index:
to_be_append = pd.Series(
[0] * len(self.actions),
index=self.q_table.columns,
name=state
)
self.q_table = self.q_table.append(to_be_append)
self.eligibility_trace = self.eligibility_trace.append(to_be_append)
1.3 DQN(Deep Q-Network)及其優化版本
1.3.1 Nature DQN
在Q learning的基礎上添加了三個新特性:
- 神經網絡Q Network代替Q Table
- 記憶庫用於經驗回放
- Q Network和Target Q Network分離
算法如下:
DQN引入了神經網絡,將Q table替換為Q Network,解決高維狀態動作對帶來的數據量過多Q table無法存儲的問題。使用神經網絡的思想,使輸入的狀態動作對和輸出的Q值變成一個函數,通過訓練來擬合。
DQN帶來的新問題以及解決方法: - 神經網絡的數據標記:使用了Q learning的思想,將目標值(真實行動帶來的反饋)作為label。
- 分布需要獨立:經驗池回放 Experience Replay; 目標網絡 Target Q Network 和預測網絡 Q Network 分離(數據獨立可以減小方差)
Q:為什么DQN需要隔一段時間后才更新目標網絡?
A:目標網絡用來評價狀態動作對,即相當於神經網絡中的標簽。如果每次更新當前網絡的時候同時更新目標網絡,相當於更新參數的同時,標簽也改變了,這容易使得網絡每次更新后相當於“重新訓練”網絡,收斂效果不好。
DQN的流程圖如下:
DQN算法代碼如下:
"""
DQN
"""
class DQN:
def __init__(self, model, gamma=0.9, learning_rate=0.01):
self.model = model.model
self.target_model = model.target_model
self.gamma = gamma
self.lr = learning_rate
# --------------------------訓練模型--------------------------- #
self.model.optimizer = tf.optimizers.Adam(learning_rate=self.lr)
self.model.loss_func = tf.losses.MeanSquaredError()
# self.model.train_loss = tf.metrics.Mean(name="train_loss")
# ------------------------------------------------------------ #
self.global_step = 0
self.update_target_steps = 200 # 每隔200個training steps再把model的參數復制到target_model中
self.loss_table = []
def predict(self, obs):
"""
使用self.model的value網絡來獲取 [Q(s,a1),Q(s,a2),...]
"""
return self.model.predict(obs)
def _train_step(self, action, features, labels):
"""
訓練步驟
"""
with tf.GradientTape() as tape:
# 計算 Q(s,a) 與 target_Q的均方差,得到loss
predictions = self.model(features, training=True)
enum_action = list(enumerate(action))
pred_action_value = tf.gather_nd(predictions, indices=enum_action)
loss = self.model.loss_func(labels, pred_action_value)
self.loss_table.append(loss)
gradients = tape.gradient(loss, self.model.trainable_variables)
self.model.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
# self.model.train_loss.update_state(loss)
def _train_model(self, action, features, labels, epochs=1):
"""
訓練模型
"""
for epoch in tf.range(1, epochs+1):
self._train_step(action, features, labels)
def learn(self, obs, action, reward, next_obs, terminal):
"""
使用DQN算法更新self.model的value網絡
"""
# 每隔200個training steps同步一次model和target_model的參數
if (self.global_step > 200) and (self.global_step % self.update_target_steps == 0):
self.replace_target()
# 從target_model中獲取 max Q' 的值,用於計算target_Q
next_pred_value = self.model.predict(next_obs)
best_v = tf.reduce_max(next_pred_value, axis=1)
target = reward + self.gamma * best_v
# 訓練模型
self._train_model(action, obs, target, epochs=1)
self.global_step += 1
def replace_target(self):
"""
預測模型權重更新到target模型權重
"""
self.target_model.get_layer(name='l1').set_weights(self.model.get_layer(name='l1').get_weights())
# self.target_model.get_layer(name='l2').set_weights(self.model.get_layer(name='l2').get_weights())
self.target_model.get_layer(name='l3').set_weights(self.model.get_layer(name='l3').get_weights())
DQN記憶庫代碼如下:
"""
ReplayMemory of Neture DQN
"""
class ReplayMemory:
def __init__(self,max_size):
self.buffer = collections.deque(maxlen=max_size)
def append(self,exp):
self.buffer.append(exp)
def sample(self, batch_size):
mini_batch = random.sample(self.buffer, batch_size)
obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []
for experience in mini_batch:
s, a, r, s_p, done = experience
obs_batch.append(s)
action_batch.append(a)
reward_batch.append(r)
next_obs_batch.append(s_p)
done_batch.append(done)
return np.array(obs_batch).astype('float32'), \
np.array(action_batch).astype('int32'), np.array(reward_batch).astype('float32'),\
np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32')
def __len__(self):
return len(self.buffer)
后續優化版本:
- Double DQN:通過改變target q的計算來優化
- Prioritized Replay DQN:通過改變經驗回放的結構和方式來優化
- Dueling DQN:通過改變神經網絡的結構來優化
1.3.2 Double Deep Q-Network
DDQN用於解決DQN中Q Network對於價值估計過高的問題,Target Q的選取仍然從目標網絡中輸出輸入狀態的所有動作的價值Q,但是選取哪一個動作不再是依照最大值,而是使用預測網絡中輸入狀態的輸出動作的最大值的索引來選取。DDQN中的Target Q如下:
1.3.3 Prioritized Experience Replay (DQN)
Prioritized Experience DQN將記憶庫中的記錄根據TD-error(Target Q - Q)進行排序,TD-error越大說明該記錄越應該被學習。為此需要修改原來記憶庫的數據結構,使用Jaromír Janisch提出的SumTree(一種完全二叉樹)和對應的記憶庫來存儲。記憶都存儲於葉子節點,非葉節點的值為子節點之和,這種結構在存儲新節點時只需插入在末端並更新相應祖先節點即可,比其他排序算法計算的時間更少。
難點在於ISWeight的作用和計算:
Importance-Sampling Weights, 用來恢復被 Prioritized replay 打亂的抽樣概率分布.
SumTree以及記憶庫代碼如下:
"""
SumTree and memory in Prioritized Experience DQN
"""
class SumTree(object):
data_pointer = 0
def __init__(self, capacity):
# data_pointer即記憶data的索引,加上(self.capacity - 1)變為SumTree索引
self.capacity = capacity # 容量
self.tree = np.zeros(2 * capacity - 1) # SumTree,負責記錄記憶的優先級,結構類似滿二叉樹?
self.data = np.zeros(capacity, dtype=object) # 葉子節點數據,負責記錄記憶內容
def add(self, p, data):
# 猜測:data_pointer指向記憶數據末尾
tree_idx = self.data_pointer + self.capacity - 1 # tree_idx指向SumTree末尾
self.data[self.data_pointer] = data # 更新記憶數據
self.update(tree_idx, p) # 更新SumTree
self.data_pointer += 1
if self.data_pointer >= self.capacity:
self.data_pointer = 0
def update(self, tree_idx, p):
change = p - self.tree[tree_idx]
self.tree[tree_idx] = p
# 更新祖先節點
while tree_idx != 0:
tree_idx = (tree_idx - 1) // 2
self.tree[tree_idx] += change
def get_leaf(self, v):
"""
關於SumTree的記憶選取可查看一下文檔:
https://zhuanlan.zhihu.com/p/165134346
"""
parent_idx = 0
while True: # while循環比論文代碼快
cl_idx = 2 * parent_idx + 1 # 左子節點
cr_idx = cl_idx + 1 # 右子節點
if cl_idx >= len(self.tree):
leaf_idx = parent_idx
break
else:
if v <= self.tree[cl_idx]:
parent_idx = cl_idx
else:
v -= self.tree[cl_idx]
parent_idx = cr_idx
data_idx = leaf_idx - self.capacity + 1 # 轉為記憶數據的索引
return leaf_idx, self.tree[leaf_idx], self.data[data_idx]
@property
def total_p(self):
return self.tree[0]
class ReplayMemory(object):
epsilon = 0.01 # 避免0優先級的最小優先級
alpha = 0.6 # [0~1] 將td-error轉化為優先級
beta = 0.4 # ? importance-sampling, from initial value increasing to 1
beta_increment_per_sampling = 0.001 # ?
abs_err_upper = 1. # 初始化誤差絕對值為1 clipped abs error
def __init__(self, capacity):
self.tree = SumTree(capacity)
def sample(self, n):
b_idx, b_memory, ISWeights = np.empty((n,), dtype=np.int32), np.empty((n, self.tree.data[0].size)), np.empty(
(n, 1))
pri_seg = self.tree.total_p / n # 分成batch個區間
# beta = 0.4 每次采樣都會增大0.001,最大值為1
self.beta = np.min([1., self.beta + self.beta_increment_per_sampling])
# 最小優先級占所有優先級之和的比重,由於后面計算ISWeights
min_prob = np.min(self.tree.tree[-self.tree.capacity:]) / self.tree.total_p
# print('min of 優先級: ' + str(self.tree.tree[-self.tree.capacity:]))
# print('total_p: ' + str(self.tree.total_p))
print('min_prob: ' + str(min_prob))
for i in range(n):
a, b = pri_seg * i, pri_seg * (i + 1) # 每個區間的上下界
v = np.random.uniform(a, b) # 在該區間隨機抽取一個數
idx, p, data = self.tree.get_leaf(v) # 該數對應的優先級
prob = p / self.tree.total_p # 當前優先級占所有優先級之和的比重
# 疑惑點:
# prob / min_prob 當前優先級和最小優先級之比
# ISWeights<=1 且僅當優先級最小時為1,優先級越大ISWeights越小,
# 而且隨着采樣次數增多,beta逐漸增大,而ISWeight逐漸減小
ISWeights[i, 0] = np.power(prob / min_prob, -self.beta)
b_idx[i], b_memory[i, :] = idx, data
return b_idx, b_memory, ISWeights
def store(self, transition):
max_p = np.max(self.tree.tree[-self.tree.capacity:])
if max_p == 0:
max_p = self.abs_err_upper
self.tree.add(max_p, transition)
def batch_update(self, tree_idx, abs_errors):
abs_errors += self.epsilon # 避免為0
"""
比較兩個數組,將每個位置數值較小的值填入一個新數組並返回,
若兩個數組其中一個是單個數字,使用廣播機制擴充為相同尺寸的數組
self.abs_err_upper為1,因此下面語句是abs_errors中所有大於1的值化為1
以保證所有error<=1
"""
clipped_errors = np.minimum(abs_errors, self.abs_err_upper)
ps = np.power(clipped_errors, self.alpha) # td-error轉化為優先級
for ti, p in zip(tree_idx, ps):
self.tree.update(ti, p)
1.3.4 Dueling DQN
Dueling DQN對DQN的神經網絡進行了優化,將輸出的狀態動作價值Q(s, a)分為了狀態價值V(s) 和優勢Advantage(s, a)之和進行表示:
網絡結構修改為:
神經網絡模型的代碼如下:
"""
Network of Dueling DQN
"""
class Model:
def __init__(self, obs_n, act_dim):
self.obs_n = obs_n
self.act_dim = act_dim
self._build_model()
def _build_layers(self):
dense_size1 = 30
dense_size2 = 16
dense_size2 = 40
# 定義輸入層
input_node = Input(shape=self.obs_n)
input_layer = input_node
# 第一層
layer1 = layers.Dense(
dense_size1,
kernel_initializer=tf.random_normal_initializer(0., 0.3),
bias_initializer=tf.constant_initializer(0.1),
# activation='relu',
activation=layers.LeakyReLU(alpha=0.05),
name='l1'
)(input_layer)
# 計算狀態價值
state_value = layers.Dense(
dense_size2,
kernel_initializer=tf.random_normal_initializer(0., 0.3),
bias_initializer=tf.constant_initializer(0.1),
activation=layers.LeakyReLU(alpha=0.05),
name='state_value_1'
)(layer1)
state_value = layers.Dense(
1,
kernel_initializer=tf.random_normal_initializer(0., 0.3),
bias_initializer=tf.constant_initializer(0.1),
activation=layers.LeakyReLU(alpha=0.05),
name='state_value_2'
)(state_value)
# 計算行動優勢
action_advantage = layers.Dense(
dense_size2,
kernel_initializer=tf.random_normal_initializer(0., 0.3),
bias_initializer=tf.constant_initializer(0.1),
activation=layers.LeakyReLU(alpha=0.05),
name='action_advantage_1'
)(layer1)
action_advantage = layers.Dense(
self.act_dim,
kernel_initializer=tf.random_normal_initializer(0., 0.3),
bias_initializer=tf.constant_initializer(0.1),
activation=layers.LeakyReLU(alpha=0.05),
name='action_advantage_2'
)(action_advantage)
# 計算Q值
q = layers.add([state_value, action_advantage])
# 定義模型
model = tf.keras.Model(inputs=input_node, outputs=q)
model.summary()
return model
def _build_model(self):
self.model = self._build_layers()
self.target_model = self._build_layers()
2. policy-based 基於策略的算法
基於策略算法不使用價值函數而是使用策略函數來輸出動作的概率,不會使用固定的動作選擇,最大的特點是將動作使用概率表示,因此所有動作都有可能被選到,而且得益於概率表示,可以將動作映射到分布中,因此可以設置高位的動作和連續動作。
Policy Gradient這種基於策略算法要做的,就是最大化獎勵期望`Rθ;而像dqn這種基於價值要做的,是最大化獎勵 Rθ 。policy-based流程如下:
更新網絡參數θ -- 如何更新使得期望最大化 -- 朝梯度上升方向更新
Policy Gradient算法可以根據更新方式分為兩大類:
MC更新方法:Reinfoce算法;
TD更新方法:Actor-Critic算法;
2.1 REINFORCE
與value-based方法的區別:
- 使用動作概率表示:在目標網絡最后一層輸出使用softmax,用動作概率而不是動作價值來表示
- 回合更新:在一個回合結束之后才學習,而不像dqn一樣幾步一學習。回合更新的優點是准確。對於深度強化學習來說,前期的標簽可能具有欺騙性?這時的過早學習可能會誤導網絡的參數,使用回合更新可以使這種趨勢減小;回合更新的缺點是相對耗時。對於一些需要長步數的回合,需要較長時間才可以學習到新的參數。
細節:
- 使用梯度上升來反向傳播,並且直接使用獎勵而不是計算誤差來作為反向傳播的輸入。
- 減弱drl中標簽欺騙性的方法:對於上面講到的深度強化學習的標簽的欺騙性(標簽是真實的動作,並不是正確的動作),Policy Gradient使用一個方法來減少這個趨勢:將標簽和輸出的差距乘以回報,用回報來評價這個誤差的重要性,如果回報是正數而且數值很大,則這個loss的權重很大,若數值很小甚至為負值(不清楚是否可以為負值),則說明這個loss權重很小甚至起反效果。
- Policy Gradient的網絡輸出是動作概率而不是動作價值,問題類似於分類問題,即對於輸入的圖像(state)判斷屬於某種類型(要使用某種action)的概率,因此構造的網絡的輸出函數可以用分類問題常用的softmax
優點:
- 因為使用動作概率,所以可以適用於隨機性問題
- 對於不好計算價值的模型更適用
- 對於連續的動作空間(高維)更適用
缺點:
- 容易局部最優
優化:
-
獎勵基准值:對於獎勵總為正的環境,難以判斷某些軌跡是否真正有幫助,對獎勵設置一個基准值(一般設置為所有軌跡的總獎勵的平均數),用軌跡的獎勵減去基准值來判斷。
-
動作評分:軌跡中並不是每一個動作都對於最終的分數有正確作用,但是在計算期望的時候,每個動作的權重都是使用軌跡的總獎勵,因此可以對每個動作分別設置權重,用這個動作開始后的獎勵之和來反映這個動作的價值(權重)。
代碼如下:
"""
Policy Gradient
"""
class PolicyGradient:
def __init__(self,
n_action,
shape_state,
n_layers,
size,
reward_decay=0.99,
learning_rate=0.01,
batch_size=1,
max_t_length=500
):
if size is None:
size = [64]
self.n_action = n_action
self.shape_state = shape_state
self.n_layers = n_layers
self.size = size
self.reward_decay = reward_decay
self.learning_rate = learning_rate
self.batch_size = batch_size
self.max_t_length = max_t_length
self.model = self._build_net()
self.optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
self.transitions = self.reset_transition()
self.p_transitions = -1
def _build_net(self):
kernel_init = 'he_normal' # 正態分布
bias_init = tf.constant_initializer(0.01) # 常數
model = tf.keras.Sequential()
for i in range(self.n_layers):
model.add(layers.Dense(self.size[i],
activation=tf.nn.tanh,
kernel_initializer=kernel_init,
bias_initializer=bias_init))
model.add(layers.Dense(self.n_action,
activation=tf.nn.tanh,
kernel_initializer=kernel_init,
bias_initializer=bias_init))
return model
def add_store_t(self):
self.transitions.append([[], [], []])
self.p_transitions += 1
def store_transition(self, s, a, r):
self.transitions[self.p_transitions][0].append(s)
self.transitions[self.p_transitions][1].append(a)
self.transitions[self.p_transitions][2].append(r)
def reset_transition(self):
self.p_transitions = -1
return []
def choose_action(self, state):
# 轉換state格式,(state_dim,) => (batch, state_dim) 此時 batch=1
state = state[np.newaxis, :]
state = tf.convert_to_tensor(state, dtype=tf.float32)
# 得到每個動作概率,由於每個網絡最后的輸出函數是tanh,需要再加一個softmax
p_actions = self.model(state)
p_actions = tf.nn.softmax(p_actions)
# 按概率選擇
action = np.random.choice(np.arange(self.n_action), p=p_actions.numpy()[0])
# print('查看p_actions格式: ' + str(p_actions))
return action
def sum_of_rewards(self):
res = []
for t in range(len(self.transitions)):
tmp = 0
t_rewards = self.transitions[t][2]
t_rewards_to_go = np.zeros_like(t_rewards)
for i in reversed(range(0, len(t_rewards))):
tmp = tmp * self.reward_decay + t_rewards[i]
t_rewards_to_go[i] = tmp
res.append(t_rewards_to_go)
return np.concatenate(res)
def learn(self):
tmp = np.array(self.transitions[0][0])
print(tmp.shape)
print(np.array([t[0] for t in self.transitions]))
obs = np.concatenate([t[0] for t in self.transitions])
acs = np.concatenate([t[1] for t in self.transitions])
res = self.sum_of_rewards()
with tf.GradientTape() as tape:
# 預測值
pred_p_actions = self.model(obs)
pred_p_actions = tf.nn.softmax(pred_p_actions)
neg_log_prob = tf.reduce_sum(-tf.math.log(pred_p_actions) * tf.one_hot(acs, depth=self.n_action), axis=1)
loss = tf.reduce_sum(neg_log_prob * res) / self.batch_size
grads = tape.gradient(loss, self.model.trainable_variables)
self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
def train(self, env, seed, n_episodes):
np.random.seed(seed)
tf.random.set_seed(seed)
env.seed(seed)
for i_episode in range(n_episodes):
state = env.reset()
self.add_store_t()
steps = 0
while True:
env.render()
action = self.choose_action(state)
next_state, reward, done, info = env.step(action)
self.store_transition(state, action, reward)
state = next_state
# 這里使用了回合更新,而且是單回合更新,從理論上說,越多回合作為一個batch越好
if done or steps > self.max_t_length:
print('done: ' + str(done) + ' steps: ' + str(steps))
if i_episode % self.batch_size == 0:
self.learn()
self.transitions = self.reset_transition()
break
3. Actor-Critic 結構的算法
3.1 Actor-Critic(AC)
使用TD-error方法改進REINFORCE從回合更新改為單步更新。用 TD 比較穩,用 MC 比較精確。
Q:為什么說 REINFORCE 方差大?
A:受知乎答案啟發 強化學習,方差比較大是說什么的方差大,為啥方差比較大? - 知乎,方差大指的是同樣的策略下,輸出的回報變化很大(有點過擬合的意思)。在 REINFORCE 中一次只采集一個或幾個完整軌跡(多了采集時間久)進行學習,導致了方差較大。(Following from 第九章 演員-評論家算法)假設我們可以采樣足夠的次數,在每次更新參數之前,我們都可以采樣足夠的次數,那其實沒有什么問題。但問題就是我們每次做 policy gradient,每次更新參數之前都要做一些采樣,這個采樣的次數其實是不可能太多的,我們只能夠做非常少量的采樣。如果你正好采樣到差的結果,比如說你采樣到 G = 100,采樣到 G = -10,那顯然你的結果會是很差的。
Q:怎么拿期望值代替采樣的值呢?
A:這邊就需要引入基於價值的(value-based)的方法。基於價值的方法就是 Q-learning。Q-learning 有兩種函數,有兩種 critics。
- 第一種 critic 是 Vπ(s),它的意思是說,假設 actor 是 π,拿 π 去跟環境做互動,當我們看到狀態 s 的時候,接下來累積獎勵 的期望值有多少。
- 還有一個 critic 是 Qπ(s,a)。Qπ(s,a) 把 s 跟 a 當作輸入,它的意思是說,在狀態 s 采取動作 a,接下來都用 actor π 來跟環境進行互動,累積獎勵的期望值是多少。
在Policy Gradient中的梯度更新為:
Actor-Critic在Policy Gradient的基礎上引入了value-based方法中值函數的概念來預測 Gt 的期望值,將Policy Gradient的梯度更新中的 Gt 轉為 Q(s, a) 函數,baseline 轉為 V(s) 函數,
3.2 Advantage Actor-Critic(A2C)
名稱中的 Advantage 代表 advantage function 優勢函數,將 Q(s, a) 用 r + V(s) 代替,從而將Q函數簡化,使只需要訓練一個 V(s) 函數的網絡而不用訓練 Q(s, a) 函數網絡
為什么可以用 r + V(s) 的形式代替 Q(s, a) 呢,首先Q函數本身可以寫成:
在這里我們將期望去掉即變成 r+v(s, a)形式 ,雖然增加了隨機性、增大了方差,但是相對於 Gt 的方差小了很多,可以去掉期望還算合理。
至於為什么兩種都行(使用q v和只是用v的兩種)而選擇使用后者呢?原始的 A3C paper 試了各種方法,最后做出來就是這個最好。233
在實際操作的時候,雖然理論上需要兩個網絡: policy 網絡和 V(s) 網絡。但是這兩個網絡的輸入都是 s ,只是輸出不同(動作的分布和 s 的價值),所以這兩個網絡可以共用前面的層,用來將 s 抽象成高級的信息。
A2C流程圖如下:
A2C代碼如下:
"""
Advantage Actor-Critic
"""
class ActorCritic:
def __init__(self,
env, # 環境
action_space, # 動作類型:連續/離散
n_action, # 動作維度(連續)/數量(離散)
obs_space, # 觀察類型:連續/離散
n_obs, # 狀態維度(連續)/數量(離散)
max_steps_per_episodes, # 每回合最大步數
learning_rate, # 學習率
reward_decay, # 獎勵衰減
n_batch # 定義一組學習數組大小
):
self.action_space = action_space
self.n_action = n_action
self.obs_space = obs_space
self.n_obs = n_obs
self.max_steps_per_episodes = max_steps_per_episodes
self.learning_rate = learning_rate
self.reward_decay = reward_decay
self.n_batch = n_batch
self.n_layers = 2 # 中間隱藏層數量,不包括輸出層
self.n_units = 128
self.activation = 'relu'
self.kernel_initializer = 'he_normal' # 默認正太分布
self.bias_initializer = tf.constant_initializer(0.01) # 常數
self.policy_model, self.critic_model = self._build_net()
self.policy_model.compile(optimizer=optimizers.Adam(self.learning_rate),
loss=self._policy_loss)
self.critic_model.compile(optimizer=optimizers.Adam(self.learning_rate),
loss=self._critic_loss)
self.learning_times = 0
self.output_env_info = False
self.transitions = {
'obs': np.empty((self.n_batch,) + env.observation_space.shape),
'acs': np.empty((self.n_batch,), dtype=np.int32),
'res': np.empty((self.n_batch,)),
'done': np.empty((self.n_batch,))
}
def _build_net(self):
policy_model = tf.keras.Sequential() # 策略網絡
for i in range(self.n_layers):
name = 'layer' + str(i)
policy_model.add(layers.Dense(self.n_units,
activation=self.activation,
kernel_initializer=self.kernel_initializer,
bias_initializer=self.bias_initializer,
name=name))
policy_model.add(layers.Dense(self.n_action,
activation=tf.nn.softmax,
kernel_initializer=self.kernel_initializer,
bias_initializer=self.bias_initializer,
name='layer' + str(self.n_layers)))
critic_model = tf.keras.Sequential() # 評價網絡
for i in range(self.n_layers):
name = 'layer' + str(i)
critic_model.add(layers.Dense(self.n_units,
activation=tf.nn.softmax,
kernel_initializer=self.kernel_initializer,
bias_initializer=self.bias_initializer,
name=name))
critic_model.add(layers.Dense(1,
activation=self.activation,
kernel_initializer=self.kernel_initializer,
bias_initializer=self.bias_initializer,
name='layer' + str(self.n_layers)))
return policy_model, critic_model
def _policy_loss(self, y_true, y_pred):
actions, advantages = tf.split(y_true, 2, axis=-1)
loss_function = losses.SparseCategoricalCrossentropy(from_logits=True)
actions = tf.cast(actions, tf.int32)
loss = loss_function(actions, y_pred, sample_weight=advantages)
return loss
def _critic_loss(self, y_true, y_pred):
return losses.mean_squared_error(y_true, y_pred)
def choose_action(self, obs):
# 轉為列向量,表示每個輸入都是一維的,使用行向量會導致所有obs作為一個輸入
obs = obs[np.newaxis, :]
obs = tf.convert_to_tensor(obs)
p_actions = self.policy_model.predict_on_batch(obs)
# print(p_actions)
action = np.random.choice(np.arange(self.n_action), p=np.array(p_actions)[0])
return action
def learn(self):
obs = self.transitions['obs']
actions = self.transitions['acs']
rewards = self.transitions['res']
done = self.transitions['done']
values = tf.squeeze(self.critic_model(obs[np.newaxis, :]))
td_error = np.append(np.zeros_like(rewards), [values[-1]], axis=-1)
# td_error = np.append(np.zeros_like(rewards), [], axis=-1)
for t in reversed(range(rewards.shape[0])):
td_error[t] = rewards[t] + td_error[t + 1] * done[t]
td_error = td_error[:-1]
advantages = td_error - values
actions_and_advantages = np.concatenate([actions[:, None], advantages[:, None]], axis=-1)
self.policy_model.train_on_batch(obs, actions_and_advantages) # arg1 input; arg2 label
self.critic_model.train_on_batch(obs, td_error)
self.transitions = {
'obs': np.empty((self.n_batch,) + env.observation_space.shape),
'acs': np.empty((self.n_batch,), dtype=np.int32),
'res': np.empty((self.n_batch,)),
'done': np.empty((self.n_batch,))
} # 清空數組
def store_transition(self, index, obs, action, reward, done):
self.transitions['obs'][index] = obs.copy()
self.transitions['acs'][index] = action
self.transitions['res'][index] = reward
self.transitions['done'][index] = done
def train(self, env, n_episodes, render=False):
reward_history = []
total_steps = 0
# 嘗試將 n_episodes 當作學習次數
obs_ = env.reset()
reward_history = [0.]
for i in range(n_episodes):
for step in range(self.n_batch):
if render:
env.render()
obs = obs_.copy()
action = self.choose_action(obs)
obs_, reward, done, info = env.step(action)
self.store_transition(step, obs, action, reward, done)
reward_history[-1] += reward
if done:
obs_ = env.reset()
print('#' + str(i) + ' reward: ' + str(reward_history[-1]))
reward_history.append(0.)
self.learn()
return reward_history
def test(self, env, n_episodes, render=False):
total_reward = 0
reward_history = []
for i in n_episodes:
obs = env.reset()
while True:
if render:
env.render()
action = self.choose_action(obs)
obs_, reward, done, info = env.step(action)
total_reward += reward
obs = obs_
if done:
break
print('average reward is ' + str(total_reward / n_episodes))
3.3 Asynchronous Advantage Actor-Critic(A3C)
加入了異步操作,解決難收斂問題。具體是借助dqn中經驗回放experience replay的思想並加以改進,在使用多個線程同時與環境交互,並更新公共網絡。
3.4 Deep Deterministic Policy Gradient(DDPG)
再次回到Q learning,前面我們針對高維狀態動作對帶來的數據量過多問題其引入了神經網絡,帶來了DQN,現在針對DQN無法拓展到連續動作引入了策略網絡,組成AC結構,在此基礎上保留了DQN的優化方法:經驗回放和目標網絡,形成了DDPFG。
DDPG算法流程如下:
3.5 Twin Delayed DDPG(TD3)
DDPG 常見的問題是已經學習好的 Q 函數開始顯著地高估 Q 值,然后導致策略被破壞了,因為它利用了 Q 函數中的誤差。為了解決這個問題,TD3的作者使用了三個技巧來優化:
-
截斷的雙 Q 學習(Clipped Dobule Q-learning) 。TD3 學習兩個 Q-function(因此名字中有 “twin”)。TD3 通過最小化均方差來同時學習兩個 Q-function:Q1 和 Q2。兩個 Q-function 都使用一個目標,兩個 Q-function 中給出較小的值會被作為如下的 Q-target:
-
延遲的策略更新(“Delayed” Policy Updates) 。相關實驗結果表明,同步訓練動作網絡和評價網絡,卻不使用目標網絡,會導致訓練過程不穩定;但是僅固定動作網絡時,評價網絡往往能夠收斂到正確的結果。因此 TD3 算法以較低的頻率更新動作網絡,較高頻率更新評價網絡,通常每更新兩次評價網絡就更新一次策略。
-
目標策略平滑(Target Policy smoothing) 。TD3 引入了 smoothing 的思想。TD3 在目標動作中加入噪音,通過平滑 Q 沿動作的變化,使策略更難利用 Q 函數的誤差。
算法流程如下:
TD3 代碼如下(我的TD3實驗效果不好,這里放出原作者的代碼,帶一點注釋):
"""
TD3
"""
class TD3:
def __init__(
self, state_dim, action_dim, action_range, hidden_dim, replay_buffer, policy_target_update_interval=1,
q_lr=3e-4, policy_lr=3e-4
):
self.replay_buffer = replay_buffer
# initialize all networks
self.q_net1 = QNetwork(state_dim, action_dim, hidden_dim)
self.q_net2 = QNetwork(state_dim, action_dim, hidden_dim)
self.target_q_net1 = QNetwork(state_dim, action_dim, hidden_dim)
self.target_q_net2 = QNetwork(state_dim, action_dim, hidden_dim)
self.policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim, action_range)
self.target_policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim, action_range)
print('Q Network (1,2): ', self.q_net1)
print('Policy Network: ', self.policy_net)
# initialize weights of target networks
self.target_q_net1 = self.target_ini(self.q_net1, self.target_q_net1)
self.target_q_net2 = self.target_ini(self.q_net2, self.target_q_net2)
self.target_policy_net = self.target_ini(self.policy_net, self.target_policy_net)
# set train mode
self.q_net1.train()
self.q_net2.train()
self.target_q_net1.eval()
self.target_q_net2.eval()
self.policy_net.train()
self.target_policy_net.eval()
self.update_cnt = 0
self.policy_target_update_interval = policy_target_update_interval
self.q_optimizer1 = tf.optimizers.Adam(q_lr)
self.q_optimizer2 = tf.optimizers.Adam(q_lr)
self.policy_optimizer = tf.optimizers.Adam(policy_lr)
def target_ini(self, net, target_net):
""" hard-copy update for initializing target networks """
for target_param, param in zip(target_net.trainable_weights, net.trainable_weights):
target_param.assign(param)
return target_net
def target_soft_update(self, net, target_net, soft_tau):
""" soft update the target net with Polyak averaging """
for target_param, param in zip(target_net.trainable_weights, net.trainable_weights):
target_param.assign( # copy weight value into target parameters
target_param * (1.0 - soft_tau) + param * soft_tau
)
return target_net
def update(self, batch_size, eval_noise_scale, reward_scale=10., gamma=0.9, soft_tau=1e-2):
""" update all networks in TD3 """
self.update_cnt += 1
state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
reward = reward[:, np.newaxis] # expand dim
done = done[:, np.newaxis]
new_next_action = self.target_policy_net.evaluate(
next_state, eval_noise_scale=eval_noise_scale
) # clipped normal noise
reward = reward_scale * (reward - np.mean(reward, axis=0)) / (
np.std(reward, axis=0) + 1e-6
) # normalize with batch mean and std; plus a small number to prevent numerical problem
# Training Q Function
target_q_input = tf.concat([next_state, new_next_action], 1) # the dim 0 is number of samples
target_q_min = tf.minimum(self.target_q_net1(target_q_input), self.target_q_net2(target_q_input))
target_q_value = reward + (1 - done) * gamma * target_q_min # if done==1, only reward
q_input = tf.concat([state, action], 1) # input of q_net
with tf.GradientTape() as q1_tape:
predicted_q_value1 = self.q_net1(q_input)
q_value_loss1 = tf.reduce_mean(tf.square(predicted_q_value1 - target_q_value))
q1_grad = q1_tape.gradient(q_value_loss1, self.q_net1.trainable_weights)
self.q_optimizer1.apply_gradients(zip(q1_grad, self.q_net1.trainable_weights))
with tf.GradientTape() as q2_tape:
predicted_q_value2 = self.q_net2(q_input)
q_value_loss2 = tf.reduce_mean(tf.square(predicted_q_value2 - target_q_value))
q2_grad = q2_tape.gradient(q_value_loss2, self.q_net2.trainable_weights)
self.q_optimizer2.apply_gradients(zip(q2_grad, self.q_net2.trainable_weights))
# Training Policy Function
if self.update_cnt % self.policy_target_update_interval == 0:
with tf.GradientTape() as p_tape:
# 更新actor的時候,我們不需要加上noise,這里是希望actor能夠尋着最大值。加上noise並沒有任何意義
new_action = self.policy_net.evaluate(
state, eval_noise_scale=0.0
) # no noise, deterministic policy gradients
new_q_input = tf.concat([state, new_action], 1)
# """ implementation 1 """
# predicted_new_q_value = tf.minimum(self.q_net1(new_q_input),self.q_net2(new_q_input))
""" implementation 2 """
predicted_new_q_value = self.q_net1(new_q_input)
policy_loss = -tf.reduce_mean(predicted_new_q_value)
p_grad = p_tape.gradient(policy_loss, self.policy_net.trainable_weights)
self.policy_optimizer.apply_gradients(zip(p_grad, self.policy_net.trainable_weights))
# Soft update the target nets
self.target_q_net1 = self.target_soft_update(self.q_net1, self.target_q_net1, soft_tau)
self.target_q_net2 = self.target_soft_update(self.q_net2, self.target_q_net2, soft_tau)
self.target_policy_net = self.target_soft_update(self.policy_net, self.target_policy_net, soft_tau)
def save(self): # save trained weights
path = os.path.join('model', '_'.join([ALG_NAME, ENV_ID]))
if not os.path.exists(path):
os.makedirs(path)
extend_path = lambda s: os.path.join(path, s)
tl.files.save_npz(self.q_net1.trainable_weights, extend_path('model_q_net1.npz'))
tl.files.save_npz(self.q_net2.trainable_weights, extend_path('model_q_net2.npz'))
tl.files.save_npz(self.target_q_net1.trainable_weights, extend_path('model_target_q_net1.npz'))
tl.files.save_npz(self.target_q_net2.trainable_weights, extend_path('model_target_q_net2.npz'))
tl.files.save_npz(self.policy_net.trainable_weights, extend_path('model_policy_net.npz'))
tl.files.save_npz(self.target_policy_net.trainable_weights, extend_path('model_target_policy_net.npz'))
def load(self): # load trained weights
path = os.path.join('model', '_'.join([ALG_NAME, ENV_ID]))
extend_path = lambda s: os.path.join(path, s)
tl.files.load_and_assign_npz(extend_path('model_q_net1.npz'), self.q_net1)
tl.files.load_and_assign_npz(extend_path('model_q_net2.npz'), self.q_net2)
tl.files.load_and_assign_npz(extend_path('model_target_q_net1.npz'), self.target_q_net1)
tl.files.load_and_assign_npz(extend_path('model_target_q_net2.npz'), self.target_q_net2)
tl.files.load_and_assign_npz(extend_path('model_policy_net.npz'), self.policy_net)
tl.files.load_and_assign_npz(extend_path('model_target_policy_net.npz'), self.target_policy_net)