本文首發於行者AI
離散動作與連續動作
離散動作與連續動作是相對的概念,前者可數,后者不可數。離散動作如LunarLander-v2環境,可以采取四種離散動作;連續動作如Pendulum-v1環境,動作是向左或向右轉,用力矩衡量,范圍為[-2,2]的連續空間。
對於連續的動作控制空間,Q-learning與DQN等算法是無法處理的。我們無法用這些算法窮舉出所有action的Q值,更無法取其中最大的Q值。那如何輸出連續的動作呢,我們可以借用萬能的神經網絡來處理。在離散動作的場景下,比如輸出上下左右這四個動作。有幾個動作,神經網絡就輸出幾個概率值,我們用\(\pi_{\theta}\left(a_{t} \mid s_{t}\right)\)來表示這個隨機性的策略。在連續的動作場景下,比如要輸出機器人手臂彎曲的角度,這樣的一個動作,網絡就輸出一個具體的浮點數,用\(\mu_{\theta}\left(s_{t}\right)\)來代表這個確定性的策略。
隨機性策略與確定性策略
對隨機性策略來講,輸入某一個狀態s,采取某一個動作的可能性是一個概率值,類似抽獎,根據概率隨機抽取某一個動作。而對於確定性策略來講,它沒有概率的影響。當神經網絡的參數固定下來了之后,輸入同樣的狀態,必然輸出同樣的動作,這就是確定性的策略。
深度確定性策略梯度
在連續控制領域,比較經典的強化學習算法就是深度確定性策略梯度(deep deterministic policy gradient,DDPG)。DDPG的特點可以從名字當中拆解后取理解。拆解成深度、確定性和策略梯度。 深度是用了神經網絡;確定性表示DDPG輸出的是一個確定性的動作,可以用於連續動作的場景;策略梯度代表用到策略網絡。
DDPG是DQN的一個擴展版本,可以擴展到連續動作空間。在 DDPG 的訓練中,同樣有目標網絡和經驗回收,經驗回放和DQN一樣,但目標網絡更新跟DQN 有所區別。首先回顧一下DQN的更新公式:
其中\(\gamma \max _{a} Q\left(S^{\prime}, a\right)\)只能處理離散動作,DDPG就是改變這一部分,用一個Actor網絡,使其可以處理連續動作空間。DDPG直接在DQN基礎上加了一個策略網絡直接輸出動作值,所以DDPG需要一邊學習Q網絡,一邊學習策略網絡。Q網絡的參數用w表示。策略網絡的參數用θ表示,這其實是Actor-Critic結構。如圖1所示。

圖1 從DQN到DDPG
通俗地解釋一下演員-評論員的結構,策略網絡扮演的就是演員的角色,它負責對外展示輸出,輸出舞蹈動作。Q網絡就是評論員,它會在每一個步驟都對演員輸出的動作做一個評估,打一個分,估計一下演員的動作未來能有多少收益,也就是去估計這個演員輸出這個動作的Q值大概是多少,即 Qw(s, a)。演員就需要根據舞台目前的狀態來做出一個動作。演員根據評論員的打分來調整自己的策略,也就是更新演員的神經網絡參數θ,爭取下次可以做得更好。評論員則是要根據觀眾的反饋,也就是環境的反饋獎勵來調整自己的打分策略,也就是要更新評論員的神經網絡的參數w ,目標是要讓每一場表演都獲得觀眾盡可能多的歡呼聲跟掌聲,也就是要最大化未來的總收益。
最開始訓練的時候,這兩個神經網絡參數是隨機的。所以評論員最開始是隨機打分的,然后演員也跟着亂來,就隨機表演,隨機輸出動作。但是由於我們有環境反饋的獎勵存在,所以評論員的評分會越來越准確,也會評判的那個演員的表現會越來越好。既然演員是一個神經網絡,是我們希望訓練好的策略網絡, 我們就需要計算梯度來去更新優化里面的參數 θ 。簡單來說,我們希望調整演員的網絡參數,使得評委打分盡可能得高。注意,這里的演員是不管觀眾的,它只關注評委,它就是迎合評委的打分Qw(s, a)而已。
接下來就是類似 DQN。DQN的最佳策略是想要學出一個很好的Q網絡,學好這個網絡之后,我們希望選取的那個動作使Q值最大。DDPG的目的也是為了求解讓Q值最大的那個動作。 演員只是為了迎合評委的打分而已,所以用來優化策略網絡的梯度就是要最大化這個Q值,那么構造的損失函數就是讓Q取一個負號。以此來最小化損失,也就是最大化Q。如圖2所示.

圖2 DQN與DDPG的區別與聯系
這里要注意,除了策略網絡要做優化,DDPG還有一個Q網絡也要優化。評委一開始也不知道怎么評分,它也是在一步一步的學習當中,慢慢地去給出准確的打分。我們優化Q網絡的方法其實跟DQN優化Q網絡的方法是一樣的,我們用真實的獎勵r和下一步的Q(即Q’)來去擬合未來的收益 Q_target。然后讓Q網絡的輸出去逼近這個Q_target。構造的損失函數就是直接求這兩個值的均方差。
為了穩定 Q_target,DDPG分別給Q網絡和策略網絡都搭建了目標網絡,和DQN類似。策略網絡的目標網絡和Q網絡的目標網絡這兩個網絡是固定一段時間的參數之后再跟評估網絡同步一下最新的參數。算法流程如圖3所示。

圖3 DDPG算法流程
代碼實現
案例: 倒立擺問題。鍾擺以隨機位置開始,目標是將其向上擺動,使其保持直立。 測試環境: Pendulum-v1
動作:往左轉還是往右轉,用力矩來衡量,即力乘以力臂。范圍[-2,2]:(連續空間)
狀態:cos(theta), sin(theta) , thetadot。
獎勵:越直立拿到的獎勵越高,越偏離,獎勵越低。獎勵的最大值為0。
定義網絡結構:
class ValueNetwork(nn.Module):
def __init__(self, num_inputs, num_action, hidden_size, init_w=3e-3):
super(ValueNetwork, self).__init__()
self.linear1 = nn.Linear(num_inputs + num_action, hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.linear3 = nn.Linear(hidden_size, 1)
self.linear3.weight.data.uniform_(-init_w, init_w)
self.linear3.bias.data.uniform_(-init_w, init_w)
def forward(self, state, action):
x = torch.cat([state, action], 1)
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
x = self.linear3(x)
return x
class PolicyNetwork(nn.Module):
def __init__(self, num_inputs, num_actions, hidden_size, init_W = 3e-3):
super(PolicyNetwork, self).__init__()
self.linear1 = nn.Linear(num_inputs, hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.linear3 = nn.Linear(hidden_size, num_actions)
self.linear3.weight.data.uniform_(-init_W, init_W)
self.linear3.weight.data.uniform_(-init_W, init_W)
def forward(self, x):
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
x = F.tanh(self.linear3(x))
return x
def get_action(self, state):
state = torch.FloatTensor(state).unsqueeze(0).to(device)
action = self.forward(state)
return action.detach().cpu().numpy()[0,0]
定義DDPG類:
class DDPG(object):
def __init__(self, action_dim, state_dim, hidden_dim):
super(DDPG, self).__init__()
self.action_dim, self.state_dim, self.hidden_dim = action_dim, state_dim, hidden_dim
self.batch_size = 128
self.gamma = 0.99
self.min_value = -np.inf
self.max_value = np.inf
self.soft_tau = 1e-2
self.replay_buffer_size = 5000
self.value_lr = 1e-3
self.policy_lr = 1e-4
self.update_count = 0
self.value_net = ValueNetwork(state_dim, action_dim, hidden_dim).to(device)
self.policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim).to(device)
self.target_value_net = ValueNetwork(state_dim, action_dim, hidden_dim).to(device)
self.target_policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim).to(device)
for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):
target_param.data.copy_(param.data)
for target_param, param in zip(self.target_policy_net.parameters(), self.policy_net.parameters()):
target_param.data.copy_(param.data)
self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=self.value_lr)
self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=self.policy_lr)
self.value_criterion = nn.MSELoss()
self.replay_buffer = ReplayBuffer(self.replay_buffer_size)
def update(self):
state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size)
state = torch.FloatTensor(state).to(device)
next_state = torch.FloatTensor(next_state).to(device)
action = torch.FloatTensor(action).to(device)
reward = torch.FloatTensor(reward).unsqueeze(1).to(device)
done = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(device)
policy_loss = self.value_net(state, self.policy_net(state))
policy_loss = - policy_loss.mean()
next_action = self.target_policy_net(next_state)
target_value = self.target_value_net(next_state, next_action)
expected_value = reward + (1.0 - done) * self.gamma * target_value
expected_value = torch.clamp(expected_value, self.min_value, self.max_value)
value = self.value_net(state, action)
value_loss = self.value_criterion(value, expected_value.detach())
for name, param in self.value_net.named_parameters():
param.requires_grad = False
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
for name, param in self.value_net.named_parameters():
param.requires_grad = True
if self.update_count % 2 == 0:
self.value_optimizer.zero_grad()
value_loss.backward()
self.value_optimizer.step()
self.update_count += 1
for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):
target_param.data.copy_(
target_param.data * (1.0 - self.soft_tau) + param.data * self.soft_tau
)
for target_param, param in zip(self.target_policy_net.parameters(), self.policy_net.parameters()):
target_param.data.copy_(
target_param.data * (1.0 - self.soft_tau) + param.data * self.soft_tau
)
訓練模型:
def main():
env = gym.make('Pendulum-v1')
env = NormalizedActions(env)
ou_noise = OUnoise(env.action_space)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
hidden_dim = 256
ddpg = DDPG(action_dim, state_dim, hidden_dim)
max_frames = 50000
max_steps = 500
frame_idx = 0
rewards = []
batch_size = 128
while frame_idx < max_frames:
state = env.reset()
ou_noise.reset()
episode_reward = 0
for step in range(max_steps):
env.render()
action = ddpg.policy_net.get_action(state)
action = ou_noise.get_action(action, step)
next_state, reward, done, _ = env.step(action)
ddpg.replay_buffer.push(state, action, reward, next_state, done)
if len(ddpg.replay_buffer) > batch_size:
ddpg.update()
state = next_state
episode_reward += reward
frame_idx += 1
if done:
break
rewards.append(episode_reward)
env.close()
plot(frame_idx, rewards)
在更新policy網絡時,未凍結value網絡的參數,最終Reward曲線如圖4所示:

圖4 Reward曲線
在更新policy網絡時,凍結value網絡的參數,最終Reward曲線如圖5所示:

圖5 Reward曲線
可以看出在更新policy網絡時,由於沒有凍結value網絡的參數,使得更新波動較大,且更新目標不穩定,所以導致結果不如凍結后得到的reward。
總結
DDPG通過異策略的方式來訓練一個確定性策略,在DQN的基礎上做優化,較好的解決了連續動作空間處理的問題。
參考文獻
[1]《Reinforcement+Learning: An+Introduction》
[2] https://blog.csdn.net/qq_37395293/article/details/114226081
我們是行者AI,我們在“AI+游戲”中不斷前行。
前往公眾號 【行者AI】,和我們一起探討技術問題吧!
