原文地址:https://blog.csdn.net/qq_30615903/article/details/80744083
DQN(Deep Q-Learning)是將深度學習deeplearning與強化學習reinforcementlearning相結合,實現了從感知到動作的端到端的革命性算法。使用DQN玩游戲的話簡直6的飛起,其中fladdy bird這個游戲就已經被DQN玩壞了。當我們的Q-table他過於龐大無法建立的話,使用DQN是一種很好的選擇
1、算法思想
DQN與Qleanring類似都是基於值迭代的算法,但是在普通的Q-learning中,當狀態和動作空間是離散且維數不高時可使用Q-Table儲存每個狀態動作對的Q值,而當狀態和動作空間是高維連續時,使用Q-Table不動作空間和狀態太大十分困難。
所以在此處可以把Q-table更新轉化為一函數擬合問題,通過擬合一個函數function來代替Q-table產生Q值,使得相近的狀態得到相近的輸出動作。因此我們可以想到深度神經網絡對復雜特征的提取有很好效果,所以可以將DeepLearning與Reinforcement Learning結合。這就成為了DQN
DL與RL結合存在以下問題 :
- DL是監督學習需要學習訓練集,強化學習不需要訓練集只通過環境進行返回獎勵值reward,同時也存在着噪聲和延遲的問題,所以存在很多狀態state的reward值都是0也就是樣本稀疏
- DL每個樣本之間互相獨立,而RL當前狀態的狀態值是依賴后面的狀態返回值的。
- 當我們使用非線性網絡來表示值函數的時候可能出現不穩定的問題
DQN中的兩大利器解決了以上問題
- 通過Q-Learning使用reward來構造標簽
- 通過experience replay(經驗池)的方法來解決相關性及非靜態分布問題
- 使用一個MainNet產生當前Q值,使用另外一個Target產生Target Q
2、experience replay 經驗池
經驗池DQN中的記憶庫用來學習之前的經歷,又因為Q learning 是一種 off-policy 離線學習法, 它能學習當前經歷着的, 也能學習過去經歷過的, 甚至是學習別人的經歷,所以在學習過程中隨機的加入之前的經驗會讓神經網絡更有效率。
所以經驗池解決了相關性及非靜態分布問題。他通過在每個timestep下agent與環境交互得到的轉移樣本 $(s_t,a_t,r_t,s_{t+1})$ 儲存到回放記憶網絡,要訓練時就隨機拿出一些(minibatch)來訓練因此打亂其中的相關性。
3、Q-target 目標網絡
Q-targets的作用其實也是一種打亂相關性的機制,使用Q-targets會使得DQN中出現兩個結構完全相同但是參數卻不同的網絡,預測Q估計的的網絡MainNet使用的是最新的參數,而預測Q現實的神經網絡TargetNet參數使用的卻是很久之前的,$Q(s,a;θ_i)$表示當前網絡MainNet的輸出,用來評估當前狀態動作對的值函數;$Q(s,a;θ^−_i)$ 表示TargetNet的輸出,可以解出targetQ並根據LossFunction更新MainNet的參數,每經過一定次數的迭代,將MainNet的參數復制給TargetNet。
引入TargetNet后,再一段時間里目標Q值使保持不變的,一定程度降低了當前Q值和目標Q值的相關性,提高了算法穩定性。
4、算法流程
4.1、前置公式
DQN的更新方式和Qlearning一樣,詳細的值函數與動作值函數此處不再推導,在Qlearning中有詳細講解不了解的請移步上一篇博客
$$Q(s,a)←Q(s,a)+α[r+γmax_{a′}Q(s′,a′)−Q(s,a)]$$
DQN的損失函數如下 θ表示網絡參數為均方誤差損失
$$L(θ)=E[(TargetQ−Q(s,a;θ))^2]$$
$$TargetQ=r+γmax_{a′}Q(s′,a′;θ)$$
4.2、算法偽代碼
DQN中存在兩個結構完全相同但是參數卻不同的網絡,預測Q估計的網絡MainNet使用的是最新的參數,而預測Q現實的神經網絡TargetNet參數使用的卻是很久之前的, $Q(s,a;θ_i)$表示當前網絡MainNet的輸出,用來評估當前狀態動作對的值函數; $Q(s,a;θ^−_i)$表示TargetNet的輸出,可以解出targetQ,因此當agent對環境采取動作a時就可以根據上述公式計算出Q並根據LossFunction更新MainNet的參數,每經過一定次數的迭代,將MainNet的參數復制給TargetNet。這樣就完成了一次學習過程
4.3、算法流程圖
5、代碼實現
根據morvan老師的例子所得
class DeepQNetwork: def __init__( self, n_actions, n_features, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9, replace_target_iter=300, memory_size=500, batch_size=32, e_greedy_increment=None, output_graph=True, ): self.n_actions = n_actions self.n_features = n_features self.lr = learning_rate self.gamma = reward_decay self.epsilon_max = e_greedy self.replace_target_iter = replace_target_iter self.memory_size = memory_size self.batch_size = batch_size self.epsilon_increment = e_greedy_increment self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max # 統計訓練次數 self.learn_step_counter = 0 # 初始化記憶 memory [s, a, r, s_] self.memory = np.zeros((self.memory_size, n_features * 2 + 2)) # 有兩個網絡組成 [target_net, evaluate_net] self._build_net() t_params = tf.get_collection('target_net_params') e_params = tf.get_collection('eval_net_params') self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)] self.sess = tf.Session() if output_graph: # 開啟tensorboard # $ tensorboard --logdir=logs # tf.train.SummaryWriter soon be deprecated, use following tf.summary.FileWriter(r'D:\logs', self.sess.graph) self.sess.run(tf.global_variables_initializer()) self.cost_his = [] def _build_net(self): # -------------- 創建 eval 神經網絡, 及時提升參數 -------------- self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s') # 用來接收 observation self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target') # 用來接收 q_target 的值, 這個之后會通過計算得到 with tf.variable_scope('eval_net'): # c_names(collections_names) 是在更新 target_net 參數時會用到 c_names, n_l1, w_initializer, b_initializer = \ ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES], 10, \ tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1) # config of layers # eval_net 的第一層. collections 是在更新 target_net 參數時會用到 with tf.variable_scope('l1'): w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names) b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names) l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1) # eval_net 的第二層. collections 是在更新 target_net 參數時會用到 with tf.variable_scope('l2'): w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names) b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names) self.q_eval = tf.matmul(l1, w2) + b2 with tf.variable_scope('loss'): # 求誤差 self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval)) with tf.variable_scope('train'): # 梯度下降 self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss) # ---------------- 創建 target 神經網絡, 提供 target Q --------------------- self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='s_') # 接收下個 observation with tf.variable_scope('target_net'): # c_names(collections_names) 是在更新 target_net 參數時會用到 c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES] # target_net 的第一層. collections 是在更新 target_net 參數時會用到 with tf.variable_scope('l1'): w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names) b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names) l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1) # target_net 的第二層. collections 是在更新 target_net 參數時會用到 with tf.variable_scope('l2'): w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names) b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names) self.q_next = tf.matmul(l1, w2) + b2 def store_transition(self, s, a, r, s_): # 判斷是否包含對應屬性 沒有就賦予初值 if not hasattr(self, 'memory_counter'): self.memory_counter = 0 # 縱向延伸 transition = np.hstack((s, [a, r], s_)) # 使用新的記憶替換掉舊網絡的記憶 index = self.memory_counter % self.memory_size self.memory[index, :] = transition self.memory_counter += 1 def choose_action(self, observation): # 給觀測值加上batch_size維度 observation = observation[np.newaxis, :] if np.random.uniform() < self.epsilon: # forward feed the observation and get q value for every actions actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation}) action = np.argmax(actions_value) else: action = np.random.randint(0, self.n_actions) return action def learn(self): # 判斷是否應該更新target-net網絡了 if self.learn_step_counter % self.replace_target_iter == 0: self.sess.run(self.replace_target_op) print('\ntarget_params_replaced\n') # 從以前的記憶中隨機抽取一些記憶 if self.memory_counter > self.memory_size: sample_index = np.random.choice(self.memory_size, size=self.batch_size) else: sample_index = np.random.choice(self.memory_counter, size=self.batch_size) batch_memory = self.memory[sample_index, :] q_next, q_eval = self.sess.run( [self.q_next, self.q_eval], feed_dict={ self.s_: batch_memory[:, -self.n_features:], # fixed params self.s: batch_memory[:, :self.n_features], # newest params }) # change q_target w.r.t q_eval's action q_target = q_eval.copy() # 下面這幾步十分重要. q_next, q_eval 包含所有 action 的值, # 而我們需要的只是已經選擇好的 action 的值, 其他的並不需要. # 所以我們將其他的 action 值全變成 0, 將用到的 action 誤差值 反向傳遞回去, 作為更新憑據. # 這是我們最終要達到的樣子, 比如 q_target - q_eval = [1, 0, 0] - [-1, 0, 0] = [2, 0, 0] # q_eval = [-1, 0, 0] 表示這一個記憶中有我選用過 action 0, 而 action 0 帶來的 Q(s, a0) = -1, 所以其他的 Q(s, a1) = Q(s, a2) = 0. # q_target = [1, 0, 0] 表示這個記憶中的 r+gamma*maxQ(s_) = 1, 而且不管在 s_ 上我們取了哪個 action, # 我們都需要對應上 q_eval 中的 action 位置, 所以就將 1 放在了 action 0 的位置. # 下面也是為了達到上面說的目的, 不過為了更方面讓程序運算, 達到目的的過程有點不同. # 是將 q_eval 全部賦值給 q_target, 這時 q_target-q_eval 全為 0, # 不過 我們再根據 batch_memory 當中的 action 這個 column 來給 q_target 中的對應的 memory-action 位置來修改賦值. # 使新的賦值為 reward + gamma * maxQ(s_), 這樣 q_target-q_eval 就可以變成我們所需的樣子. # 具體在下面還有一個舉例說明. batch_index = np.arange(self.batch_size, dtype=np.int32) eval_act_index = batch_memory[:, self.n_features].astype(int) reward = batch_memory[:, self.n_features + 1] q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1) """ 假如在這個 batch 中, 我們有2個提取的記憶, 根據每個記憶可以生產3個 action 的值: q_eval = [[1, 2, 3], [4, 5, 6]] q_target = q_eval = [[1, 2, 3], [4, 5, 6]] 然后根據 memory 當中的具體 action 位置來修改 q_target 對應 action 上的值: 比如在: 記憶 0 的 q_target 計算值是 -1, 而且我用了 action 0; 記憶 1 的 q_target 計算值是 -2, 而且我用了 action 2: q_target = [[-1, 2, 3], [4, 5, -2]] 所以 (q_target - q_eval) 就變成了: [[(-1)-(1), 0, 0], [0, 0, (-2)-(6)]] 最后我們將這個 (q_target - q_eval) 當成誤差, 反向傳遞會神經網絡. 所有為 0 的 action 值是當時沒有選擇的 action, 之前有選擇的 action 才有不為0的值. 我們只反向傳遞之前選擇的 action 的值, """ # 訓練eval網絡 _, self.cost = self.sess.run([self._train_op, self.loss], feed_dict={self.s: batch_memory[:, :self.n_features], self.q_target: q_target}) self.cost_his.append(self.cost) # 因為在訓練過程中會逐漸收斂所以此處動態設置增長epsilon self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max self.learn_step_counter += 1