簡介
這篇筆記主要是記錄了百度PARL的學習過程中感覺還比較經典且入門的部分。
CartPole也相當於強化學習里面的Helloworld了吧。
環境描述
基本環境可以參考:https://gym.openai.com/envs/CartPole-v1/ 以及https://github.com/PaddlePaddle/PARL/tree/develop/examples/DQN
學習的目標是使得木棍在小車上樹立的時間盡量長。action的選擇只有向左或者是向右。環境會自動給出給出反饋,每一步后的得分,下一個局面的描述的狀態,是否是結束。環境狀態被gym自動封裝成一個np.array,可以通過有關的API獲取信息。 在這個例子中,環境的描述是一個4維的向量,我們不必管這4維向量的意義,只需要知道有這個描述即可(當然,如果你感興趣,可以深究)。每個環境,gym都封裝了一分數reward。而且,如果是結束狀態,gym會給出描述符。這些在下面的代碼中會有說明。
算法介紹和說明
先給出基本算法描述,算法來自上面的參考連接:
這是一個最基本的Off-Policy借助Replay-Buffer和神經網絡實現的算法。上面的ϕ,是表示一個連貫的輸入,因為上述的算法是輸入了一系列的圖片。不過在這個例子中,可以把ϕ理解成僅僅輸入當前的局面,即。之后會有exploration的操作,這是為了隨機的選取那些評估分數比較低,但是可能會有較好表現的行動。Q(s,a) Q(s,a)Q(s,a)表示一個Q-function,它的作用是給狀態s下的每個行動a一個評估分數。實際操作中,Q是一個神經網絡,每個狀態作為神經網絡的輸入,神經網絡的輸出是所有的行動a的評估分數。算法給出了 yi的計算法則。對神經網絡進行BP的時候,就根據這個公式來即可。每次從buffer中選取一個批次的數據,執行隨機梯度下降SGD算法,即可進行修正。
代碼示例(在AI Studio平台)
Step1 安裝依賴
!pip uninstall -y parl !pip uninstall -y pandas scikit-learn !pip install gym !pip install paddlepaddle==1.6.3 #盡量確保版本為此 !pip install parl==1.3.1 #盡量確保版本為此
Step2 導入依賴
import parl from parl import layers import paddle.fluid as fluid import copy import numpy as np import os import gym from parl.utils import logger
Step3 設置超參數
LEARN_FREQ = 5 # 訓練頻率,不需要每一個step都learn,攢一些新增經驗后再learn,提高效率 MEMORY_SIZE = 20000 # replay memory的大小,越大越占用內存 MEMORY_WARMUP_SIZE = 200 # replay_memory 里需要預存一些經驗數據,再開啟訓練 BATCH_SIZE = 32 # 每次給agent learn的數據數量,從replay memory隨機里sample一批數據出來 LEARNING_RATE = 0.001 # 學習率 GAMMA = 0.99 # reward 的衰減因子,一般取 0.9 到 0.999 不等
Step4 搭建Model、Algorithm、Agent架構
class Model(parl.Model): def __init__(self, act_dim): hid1_size = 128 hid2_size = 128 # 3層全連接網絡 self.fc1 = layers.fc(size=hid1_size, act='relu') self.fc2 = layers.fc(size=hid2_size, act='relu') self.fc3 = layers.fc(size=act_dim, act=None) def value(self, obs): # 定義網絡 # 輸入state,輸出所有action對應的Q,[Q(s,a1), Q(s,a2), Q(s,a3)...] h1 = self.fc1(obs) h2 = self.fc2(h1) Q = self.fc3(h2) return Q
class DQN(parl.Algorithm): def __init__(self, model, act_dim=None, gamma=None, lr=None): """ DQN algorithm Args: model (parl.Model): 定義Q函數的前向網絡結構 act_dim (int): action空間的維度,即有幾個action gamma (float): reward的衰減因子 lr (float): learning rate 學習率. """ self.model = model self.target_model = copy.deepcopy(model) assert isinstance(act_dim, int) assert isinstance(gamma, float) assert isinstance(lr, float) self.act_dim = act_dim self.gamma = gamma self.lr = lr def predict(self, obs): """ 使用self.model的value網絡來獲取 [Q(s,a1),Q(s,a2),...] """ return self.model.value(obs) def learn(self, obs, action, reward, next_obs, terminal): """ 使用DQN算法更新self.model的value網絡 """ # 從target_model中獲取 max Q' 的值,用於計算target_Q next_pred_value = self.target_model.value(next_obs) best_v = layers.reduce_max(next_pred_value, dim=1) best_v.stop_gradient = True # 阻止梯度傳遞 terminal = layers.cast(terminal, dtype='float32') target = reward + (1.0 - terminal) * self.gamma * best_v pred_value = self.model.value(obs) # 獲取Q預測值 # 將action轉onehot向量,比如:3 => [0,0,0,1,0] action_onehot = layers.one_hot(action, self.act_dim) action_onehot = layers.cast(action_onehot, dtype='float32') # 下面一行是逐元素相乘,拿到action對應的 Q(s,a) # 比如:pred_value = [[2.3, 5.7, 1.2, 3.9, 1.4]], action_onehot = [[0,0,0,1,0]] # ==> pred_action_value = [[3.9]] pred_action_value = layers.reduce_sum( layers.elementwise_mul(action_onehot, pred_value), dim=1) # 計算 Q(s,a) 與 target_Q的均方差,得到loss cost = layers.square_error_cost(pred_action_value, target) cost = layers.reduce_mean(cost) optimizer = fluid.optimizer.Adam(learning_rate=self.lr) # 使用Adam優化器 optimizer.minimize(cost) return cost def sync_target(self): """ 把 self.model 的模型參數值同步到 self.target_model """ self.model.sync_weights_to(self.target_model)
class Agent(parl.Agent): def __init__(self, algorithm, obs_dim, act_dim, e_greed=0.1, e_greed_decrement=0): assert isinstance(obs_dim, int) assert isinstance(act_dim, int) self.obs_dim = obs_dim self.act_dim = act_dim super(Agent, self).__init__(algorithm) self.global_step = 0 self.update_target_steps = 200 # 每隔200個training steps再把model的參數復制到target_model中 self.e_greed = e_greed # 有一定概率隨機選取動作,探索 self.e_greed_decrement = e_greed_decrement # 隨着訓練逐步收斂,探索的程度慢慢降低 def build_program(self): self.pred_program = fluid.Program() self.learn_program = fluid.Program() with fluid.program_guard(self.pred_program): # 搭建計算圖用於 預測動作,定義輸入輸出變量 obs = layers.data( name='obs', shape=[self.obs_dim], dtype='float32') self.value = self.alg.predict(obs) with fluid.program_guard(self.learn_program): # 搭建計算圖用於 更新Q網絡,定義輸入輸出變量 obs = layers.data( name='obs', shape=[self.obs_dim], dtype='float32') action = layers.data(name='act', shape=[1], dtype='int32') reward = layers.data(name='reward', shape=[], dtype='float32') next_obs = layers.data( name='next_obs', shape=[self.obs_dim], dtype='float32') terminal = layers.data(name='terminal', shape=[], dtype='bool') self.cost = self.alg.learn(obs, action, reward, next_obs, terminal) def sample(self, obs): sample = np.random.rand() # 產生0~1之間的小數 if sample < self.e_greed: act = np.random.randint(self.act_dim) # 探索:每個動作都有概率被選擇 else: act = self.predict(obs) # 選擇最優動作 self.e_greed = max( 0.01, self.e_greed - self.e_greed_decrement) # 隨着訓練逐步收斂,探索的程度慢慢降低 return act def predict(self, obs): # 選擇最優動作 obs = np.expand_dims(obs, axis=0) pred_Q = self.fluid_executor.run( self.pred_program, feed={'obs': obs.astype('float32')}, fetch_list=[self.value])[0] pred_Q = np.squeeze(pred_Q, axis=0) act = np.argmax(pred_Q) # 選擇Q最大的下標,即對應的動作 return act def learn(self, obs, act, reward, next_obs, terminal): # 每隔200個training steps同步一次model和target_model的參數 if self.global_step % self.update_target_steps == 0: self.alg.sync_target() self.global_step += 1 act = np.expand_dims(act, -1) feed = { 'obs': obs.astype('float32'), 'act': act.astype('int32'), 'reward': reward, 'next_obs': next_obs.astype('float32'), 'terminal': terminal } cost = self.fluid_executor.run( self.learn_program, feed=feed, fetch_list=[self.cost])[0] # 訓練一次網絡 return cost
Step5 ReplayMemory
import random import collections import numpy as np class ReplayMemory(object): def __init__(self, max_size): self.buffer = collections.deque(maxlen=max_size) # 增加一條經驗到經驗池中 def append(self, exp): self.buffer.append(exp) # 從經驗池中選取N條經驗出來 def sample(self, batch_size): mini_batch = random.sample(self.buffer, batch_size) obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], [] for experience in mini_batch: s, a, r, s_p, done = experience obs_batch.append(s) action_batch.append(a) reward_batch.append(r) next_obs_batch.append(s_p) done_batch.append(done) return np.array(obs_batch).astype('float32'), \ np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\ np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32') def __len__(self): return len(self.buffer)
Step6 Training && Test(訓練&&測試)
def run_episode(env, agent, rpm): total_reward = 0 obs = env.reset() step = 0 while True: step += 1 action = agent.sample(obs) # 采樣動作,所有動作都有概率被嘗試到 next_obs, reward, done, _ = env.step(action) rpm.append((obs, action, reward, next_obs, done)) # train model if (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0): (batch_obs, batch_action, batch_reward, batch_next_obs, batch_done) = rpm.sample(BATCH_SIZE) train_loss = agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs, batch_done) # s,a,r,s',done total_reward += reward obs = next_obs if done: break return total_reward # 評估 agent, 跑 5 個episode,總reward求平均 def evaluate(env, agent, render=False): eval_reward = [] for i in range(5): obs = env.reset() episode_reward = 0 while True: action = agent.predict(obs) # 預測動作,只選最優動作 obs, reward, done, _ = env.step(action) episode_reward += reward if render: env.render() if done: break eval_reward.append(episode_reward) return np.mean(eval_reward)
Step7 創建環境和Agent,創建經驗池,啟動訓練,保存模型
env = gym.make('CartPole-v0') # CartPole-v0: 預期最后一次評估總分 > 180(最大值是200) action_dim = env.action_space.n # CartPole-v0: 2 obs_shape = env.observation_space.shape # CartPole-v0: (4,) rpm = ReplayMemory(MEMORY_SIZE) # DQN的經驗回放池 # 根據parl框架構建agent model = Model(act_dim=action_dim) algorithm = DQN(model, act_dim=action_dim, gamma=GAMMA, lr=LEARNING_RATE) agent = Agent( algorithm, obs_dim=obs_shape[0], act_dim=action_dim, e_greed=0.1, # 有一定概率隨機選取動作,探索 e_greed_decrement=1e-6) # 隨着訓練逐步收斂,探索的程度慢慢降低 # 加載模型 # save_path = './dqn_model.ckpt' # agent.restore(save_path) # 先往經驗池里存一些數據,避免最開始訓練的時候樣本豐富度不夠 while len(rpm) < MEMORY_WARMUP_SIZE: run_episode(env, agent, rpm) max_episode = 2000 # 開始訓練 episode = 0 while episode < max_episode: # 訓練max_episode個回合,test部分不計算入episode數量 # train part for i in range(0, 50): total_reward = run_episode(env, agent, rpm) episode += 1 # test part eval_reward = evaluate(env, agent, render=False) # render=True 查看顯示效果 logger.info('episode:{} e_greed:{} test_reward:{}'.format( episode, agent.e_greed, eval_reward)) # 訓練結束,保存模型 save_path = './dqn_model.ckpt' agent.save(save_path)
感覺吧,與監督學習相比,強化學習多了action,environment等概念。雖然可以將reward類比成監督學習中的label(或者反過來,label也可以認為是強化學習中最終的reward),但通過action與environment不斷的交互甚至改變environment這一特點,是監督學習中所沒有的。在構建應用的時候,監督學習的學習的目標:label,灌入的數據都是一個定值。比如,圖像的分類的問題,在用CNN訓練的時候,圖片本身不發生變化,label也不會發生變化,唯一變化的是神經網絡中的權重值。但強化學習在訓練的時候,除了神經網絡中的權重會發生變化(如果用NN建模的話),environment、reward等都會發生動態的變化。
從結果曲線來看,強化學習跟監督學習也不太一樣,監督的曲線是下降的。RL的曲線會波動的很厲害(上上下下的),不過如果模型好的話,大體上會是上升的。不知道是不是參數選擇上面還要改一改