單臂擺是強化學習的一個經典模型,本文采用了4種不同的算法來解決這個問題,使用Pytorch實現。
DQN:
參考:
算法思想:
https://mofanpy.com/tutorials/machine-learning/torch/DQN/
算法實現
https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
個人理解:DQN算法將Q學習和神經網絡算法結合,解決了狀態空間連續的問題。由於Q學習是off-policy的,所以需要target網絡,即需要一個滯后版本的神經網絡,防止一些並非最優的動作被采樣之后,該動作的reward增加,之后就一直選擇該非最優動作,從而影響學習的效率。由於神經網絡的輸入和Target要求獨立同分布,所以采用ReplayBuffer和隨機采樣來解決這個問題。DQN的神經網絡目標是讓Q值預測的更准,所以loss是target和eval的完全平方差。
代碼:
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import gym import random import numpy as np from collections import namedtuple GAMMA = 0.99 lr = 0.1 EPSION = 0.1 buffer_size = 10000 # replay池的大小 batch_size = 32 num_episode = 100000 target_update = 10 # 每過多少個episode將net的參數復制到target_net # 定義神經網絡 class Net(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Net, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) self.Linear2 = nn.Linear(hidden_size, hidden_size) self.Linear3 = nn.Linear(hidden_size, output_size) def forward(self, x): # print('x: ', x) x = F.relu(self.Linear1(x)) x = F.relu(self.Linear2(x)) x = self.Linear3(x) return x # nametuple容器 Transition = namedtuple('Transition', ('state', 'action', 'reward', 'done', 'next_state')) class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size): # 采樣 return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) class DQN(object): def __init__(self, input_size, hidden_size, output_size): self.net = Net(input_size, hidden_size, output_size) self.target_net = Net(input_size, hidden_size, output_size) self.optim = optim.Adam(self.net.parameters(), lr=lr) self.target_net.load_state_dict(self.net.state_dict()) self.buffer = ReplayMemory(buffer_size) self.loss_func = nn.MSELoss() self.steps_done = 0 def put(self, s0, a0, r, t, s1): self.buffer.push(s0, a0, r, t, s1) def select_action(self, state): eps_threshold = random.random() action = self.net(torch.Tensor(state)) if eps_threshold > EPSION: choice = torch.argmax(action).numpy() else: choice = np.random.randint(0, action.shape[ 0]) # 隨機[0, action.shape[0]]之間的數 return choice def update_parameters(self): if self.buffer.__len__() < batch_size: return samples = self.buffer.sample(batch_size) batch = Transition(*zip(*samples)) # 將tuple轉化為numpy tmp = np.vstack(batch.action) # 轉化成Tensor state_batch = torch.Tensor(batch.state) action_batch = torch.LongTensor(tmp.astype(int)) reward_batch = torch.Tensor(batch.reward) done_batch = torch.Tensor(batch.done) next_state_batch = torch.Tensor(batch.next_state) q_next = torch.max(self.target_net(next_state_batch).detach(), dim=1, keepdim=True)[0] q_eval = self.net(state_batch).gather(1, action_batch) q_tar = reward_batch.unsqueeze(1) + (1-done_batch) * GAMMA * q_next loss = self.loss_func(q_eval, q_tar) # print(loss) self.optim.zero_grad() loss.backward() self.optim.step() if __name__ == '__main__': env = gym.make('CartPole-v0') # 狀態空間:4維 # 動作空間:1維,並且是離散的,只有0和1兩個動作 Agent = DQN(env.observation_space.shape[0], 256, env.action_space.n) average_reward = 0 # 目前所有的episode的reward的平均值 for i_episode in range(num_episode): s0 = env.reset() tot_reward = 0 # 每個episode的總reward tot_time = 0 # 實際每輪運行的時間 (reward的定義可能不一樣) while True: env.render() a0 = Agent.select_action(s0) s1, r, done, _ = env.step(a0) tot_time += r # 計算當前episode的總時間 # 網上定義的reward方法 # x, x_dot, theta, theta_dot = s1 # r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8 # r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5 # r = r1 + r2 tot_reward += r # 計算當前episode的總reward if done: t = 1 else: t = 0 Agent.put(s0, a0, r, t, s1) # 放入replay池 s0 = s1 Agent.update_parameters() if done: average_reward = average_reward + 1 / (i_episode + 1) * ( tot_reward - average_reward) print('Episode ', i_episode, 'tot_time: ', tot_time, ' tot_reward: ', tot_reward, ' average_reward: ', average_reward) break if i_episode % target_update == 0: Agent.target_net.load_state_dict(Agent.net.state_dict())
有一個點需要注意,網上有些DQN的實現沒有考慮終止狀態,所以需要修改Reward才能達到好的效果。在考慮終止狀態后,使用原始的reward就可以學習。
Reinforce:
參考:
思路及代碼:
https://blog.csdn.net/qq_37266917/article/details/109855244
個人理解:
Reinforce是一種策略梯度算法,對參數化的策略梯度算法進行梯度上升。需要注意網絡不能太復雜,不然會過擬合導致很難學習。通過策略梯度定理,我們知道了怎么進行梯度上升。概率前面的回報可以看成梯度上升的幅度,即回報越大提升的概率也越多。所以在Policy Gradient中引入的基線(baseline),以防止某些非最優的動作被選擇之后概率變得過大(雖然在樣本足夠多的時候這個問題也能解決)
神經網絡的loss是 t時刻的回報 * t時刻的動作的概率取對數。以及要取負,因為神經網絡是梯度下降,最小化loss,取負數就是最大化回報。
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import gym import random import numpy as np from torch.distributions import Categorical from collections import deque from collections import namedtuple GAMMA = 1.0 lr = 0.1 EPSION = 0.9 buffer_size = 10000 batch_size = 32 num_episode = 100000 target_update = 10 EPS_START = 0.9 EPS_END = 0.05 EPS_DECAY = 200 class Policy(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Policy, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) self.Linear1.weight.data.normal_(0, 0.1) # self.Linear2 = nn.Linear(hidden_size, hidden_size) # self.Linear2.weight.data.normal_(0, 0.1) self.Linear3 = nn.Linear(hidden_size, output_size) self.Linear3.weight.data.normal_(0, 0.1) def forward(self, x): x = F.relu(self.Linear1(x)) # x = F.relu(self.Linear2(x)) x = F.softmax(self.Linear3(x), dim=1) return x # x = F.relu(self.fc1(x)) # x = self.fc2(x) # return F.softmax(x, dim=1) class Reinforce(object): def __init__(self, input_size, hidden_size, output_size): self.net = Policy(input_size, hidden_size, output_size) self.optim = optim.Adam(self.net.parameters(), lr=0.01) def select_action(self, s): s = torch.Tensor(s).unsqueeze(0) probs = self.net(s) tmp = Categorical(probs) a = tmp.sample() log_prob = tmp.log_prob(a) return a.item(), log_prob def update_parameters(self, rewards, log_probs): R = 0 loss = 0 # for i in reversed(range(len(rewards))): # R = rewards[i] + GAMMA * R for i in reversed(range(len(rewards))): R = rewards[i] + GAMMA * R loss = loss - R * log_probs[i] # discounts = [GAMMA ** i for i in range(len(rewards) + 1)] # R = sum([a * b for a, b in zip(discounts, rewards)]) # policy_loss = [] # for log_prob in log_probs: # policy_loss.append(-log_prob * R) # loss = torch.cat(policy_loss).sum() # print('loss: ', len(loss)) # loss = loss / len(loss) self.optim.zero_grad() loss.backward() self.optim.step() if __name__ == '__main__': env = gym.make('CartPole-v0') average_reward = 0 Agent = Reinforce(env.observation_space.shape[0], 16, env.action_space.n) # scores_deque = deque(maxlen=100) # scores = [] for i_episode in range(1, num_episode + 1): s = env.reset() log_probs = [] rewards = [] while True: env.render() a, prob = Agent.select_action(s) s1, r, done, _ = env.step(a) # scores_deque.append(sum(rewards)) # scores.append(sum(rewards)) log_probs.append(prob) rewards.append(r) s = s1 if done: average_reward = average_reward + (1 / (i_episode + 1)) * (np.sum(rewards) - average_reward) if i_episode % 100 == 0: # print('Episode {}\t Average Score: {:.2f}'.format(i_episode, np.mean(scores_deque))) print('episode: ', i_episode, "tot_rewards: ", np.sum(rewards), 'average_rewards: ', average_reward) break Agent.update_parameters(rewards, log_probs)
DDPG:
參考:
思路:https://www.cnblogs.com/pinard/p/10345762.html
實現:https://zhuanlan.zhihu.com/p/99406809
個人理解:DDPG算法采用了Actor-Critic框架,像是DQN和Policy Gradient的結合。在DDPG中,Actor輸出的是一個具體的動作,而不是動作的概率分布,Critic輸出的是動作的Q值。Actor和Critic都需要一個Tareget網絡,需要ReplayBuffer打破相關性。網上我沒找到用DDPG和Pytorch解決單臂桿問題的代碼,所以我的解決方法可能不是最好的。因為單臂桿的動作是離散的2個(0,1),最開始我給Actor設置了2個輸出並用argmax決定是哪個。后面發現argmax沒有梯度,於是我將輸出改為了一個,並套了一層sigmoid,輸出小於0.5當0算,大於0.5當1算。Critic的loss和DQN類似,都是target和eval的完全平方差。Actor的loss需要自己先輸出a,再用critic得到估值,求平均值,再取負。
代碼:
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import gym import random import numpy as np from collections import namedtuple import math GAMMA = 0.9 lr = 0.1 EPSION = 0.9 buffer_size = 10000 batch_size = 32 num_episode = 100000 target_update = 10 EPS_START = 0.9 EPS_END = 0.05 EPS_DECAY = 200 tau = 0.02 #定義神經網絡 class Actor(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Actor, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) self.Linear1.weight.data.normal_(0, 0.1) # self.Linear2 = nn.Linear(hidden_size, hidden_size) # self.Linear2.weight.data.normal_(0, 0.1) self.Linear3 = nn.Linear(hidden_size, output_size) self.Linear3.weight.data.normal_(0, 0.1) def forward(self, x): # print('x: ', x) x = F.relu(self.Linear1(x)) # x = F.relu(self.Linear2(x)) x = torch.sigmoid(self.Linear3(x)) return x class Critic(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Critic, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) self.Linear1.weight.data.normal_(0, 0.1) # self.Linear2 = nn.Linear(hidden_size, hidden_size) # self.Linear2.weight.data.normal_(0, 0.1) self.Linear3 = nn.Linear(hidden_size, output_size) self.Linear3.weight.data.normal_(0, 0.1) def forward(self, s, a): # print('s1: ', s) # print('a1: ', a) x = torch.cat([s, a], dim=1) # print('x: ', x) x = F.relu(self.Linear1(x)) # x = F.relu(self.Linear2(x)) x = self.Linear3(x) return x #nametuple容器 Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done')) class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size):#采樣 return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) class DDPG(object): def __init__(self, input_size, action_shape, hidden_size, output_size): self.actor = Actor(input_size, hidden_size, action_shape) self.actor_target = Actor(input_size, hidden_size, action_shape) self.critic = Critic(input_size + action_shape, hidden_size, action_shape) self.critic_target = Critic(input_size + action_shape, hidden_size, action_shape) self.actor_optim = optim.Adam(self.actor.parameters(), lr=0.01) self.critic_optim = optim.Adam(self.critic.parameters(), lr=0.01) self.actor_target.load_state_dict(self.actor.state_dict()) self.critic_target.load_state_dict(self.critic.state_dict()) self.buffer = ReplayMemory(buffer_size) self.loss_func = nn.MSELoss() self.steps_done = 0 def put(self, s0, a0, r, s1, done): self.buffer.push(s0, a0, r, s1, done) def select_action(self, state): state = torch.Tensor(state) a = self.actor(state) return a def update_parameters(self): if self.buffer.__len__() < batch_size: return samples = self.buffer.sample(batch_size) batch = Transition(*zip(*samples)) # print(batch.action) #將tuple轉化為numpy # tmp = np.vstack(batch.action) # print(tmp) #轉化成Tensor state_batch = torch.Tensor(batch.state) action_batch = torch.Tensor(batch.action).unsqueeze(0).view(-1, 1) reward_batch = torch.Tensor(batch.reward) next_state_batch = torch.Tensor(batch.next_state) done_batch = torch.Tensor(batch.done) #critic更新 next_action_batch = self.actor_target(next_state_batch).unsqueeze(0).detach().view(-1, 1) # print('batch: ', next_action_batch) r_eval = self.critic(state_batch, action_batch) # print('s: ', next_state_batch) # print('a: ', next_action_batch) r_target = reward_batch + GAMMA * self.critic_target(next_state_batch, next_action_batch).detach().view(1, -1) * done_batch r_eval = torch.squeeze(r_eval) r_target = torch.squeeze(r_target) loss = self.loss_func(r_eval, r_target) self.critic_optim.zero_grad() loss.backward() self.critic_optim.step() #actor更新 a = self.actor(state_batch).unsqueeze(0).view(-1, 1) # print('a: ', a) loss = -torch.mean(self.critic(state_batch, a)) self.actor_optim.zero_grad() loss.backward() # print('a: ', a) self.actor_optim.step() #soft update def soft_update(net_target, net): for target_param, param in zip(net_target.parameters(), net.parameters()): target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau) soft_update(self.actor_target, self.actor) soft_update(self.critic_target, self.critic) if __name__ == '__main__': env = gym.make('CartPole-v0') Agent = DDPG(env.observation_space.shape[0], 1, 16, env.action_space.n) average_reward = 0 for i_episode in range(num_episode): s0 = env.reset() tot_reward = 0 tot_time = 0 while True: env.render() a0 = Agent.select_action(s0) s1, r, done, _ = env.step(round(a0.detach().numpy()[0])) tot_time += r tot_reward += r Agent.put(s0, a0, r, s1, 1 - done) #結束狀態很重要,不然會很難學習。 s0 = s1 Agent.update_parameters() if done: average_reward = average_reward + 1 / (i_episode + 1) * (tot_time - average_reward) # if i_episode % 100 == 0: print('Episode ', i_episode, 'tot_time: ', tot_time, ' tot_reward: ', tot_reward, ' average_reward: ', average_reward) break # if i_episode % target_update == 0: # Agent.target_net.load_state_dict(Agent.net.state_dict())
PPO:
參考:
PPO算法流程及思想:
https://blog.csdn.net/qq_30615903/article/details/86308045
https://www.jianshu.com/p/9f113adc0c50
PPO算法的實現:
https://blog.csdn.net/weixin_42165585/article/details/112362125
個人理解:
PPO算法也是Actor-Critic架構,但是與DDPG不同,PPO為on-policy算法,所以不需要設計target網絡,也不需要ReplayBuffer, 並且Actor和Critic的網絡參數可以共享以便加快學習。PPO引入了重要度采樣,使得每個episode的數據可以被多訓練幾次(實際的情況中,采樣可能非常耗時)從而節省時間,clip保證的更新的幅度不會太大。
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from collections import namedtuple import random import gym import math lr = 0.0005 Capacity = 10000 num_epidose = 10000 Gamma = 0.98 lmbda = 0.95 eps_clip = 0.1 class Net(nn.Module): def __init__(self, input_size,hidden_size, output_size): super(Net, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) # self.Linear2 = nn.Linear(hidden_size, hidden_size) self.Linear_actor = nn.Linear(hidden_size, output_size) self.Linear_critic = nn.Linear(hidden_size, 1) def actor_forward(self, s, dim): s = F.relu(self.Linear1(s)) prob = F.softmax(self.Linear_actor(s), dim=dim) # print(prob) return prob def critic_forward(self, s): s = F.relu(self.Linear1(s)) # s = F.relu(self.Linear2(s)) return self.Linear_critic(s) Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'rate', 'done')) class ReplayBuffer(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size):#采樣 return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) def clean(self): self.position = 0 self.memory = [] class PPO(object): def __init__(self, input_size, hidden_size, output_size): super(PPO, self).__init__() self.net = Net(input_size, hidden_size, output_size) self.optim = optim.Adam(self.net.parameters(), lr=lr) self.buffer = ReplayBuffer(capacity=Capacity) def act(self, s, dim): s = torch.Tensor(s) prob = self.net.actor_forward(s, dim) return prob def critic(self, s): return self.net.critic_forward(s) def put(self, s0, a0, r, s1, rate, done): self.buffer.push(s0, a0, r, s1, rate, done) def make_batch(self): batch = self.buffer.memory samples = self.buffer.memory batch = Transition(*zip(*samples)) state_batch = torch.Tensor(batch.state).view(-1, 1) action_batch = torch.LongTensor(batch.action).view(-1, 1) reward_batch = torch.Tensor(batch.reward).view(-1, 1) next_state_batch = torch.Tensor(batch.next_state) rate_batch = torch.Tensor(batch.rate).view(-1, 1) done_batch = torch.LongTensor(batch.done).view(-1, 1) return state_batch, action_batch, reward_batch, next_state_batch, done_batch, rate_batch def update_parameters(self): samples = self.buffer.memory batch = Transition(*zip(*samples)) batch = self.buffer.memory samples = self.buffer.memory batch = Transition(*zip(*samples)) state_batch = torch.Tensor(batch.state) action_batch = torch.LongTensor(batch.action).view(-1, 1) reward_batch = torch.Tensor(batch.reward).view(-1, 1) next_state_batch = torch.Tensor(batch.next_state) rate_batch = torch.Tensor(batch.rate).view(-1, 1) done_batch = torch.LongTensor(batch.done).view(-1, 1) for i in range(3): td_target = reward_batch + Gamma * self.critic(next_state_batch) * done_batch delta = td_target - self.critic(state_batch) delta = delta.detach().numpy() advantage_list = [] advantage = 0.0 for delta_t in delta[::-1]: advantage = Gamma * advantage + delta_t advantage_list.append(advantage) advantage_list.reverse() advantage = torch.Tensor(advantage_list) prob = self.act(state_batch, 1).squeeze(0) prob_a = prob.gather(1, action_batch.view(-1, 1)) ratio = torch.exp(torch.log(prob_a) - torch.log(rate_batch)) surr1 = ratio * advantage surr2 = torch.clamp(ratio, 1 - eps_clip, 1 + eps_clip) * advantage loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.critic(state_batch), td_target.detach()) self.optim.zero_grad() loss.mean().backward() self.optim.step() if __name__ == '__main__': env = gym.make('CartPole-v0') Agent = PPO(env.observation_space.shape[0], 256, env.action_space.n) average_reward = 0 for i_episode in range(num_epidose): s0 = env.reset() tot_reward = 0 while True: env.render() prob = Agent.act(torch.from_numpy(s0).float(), 0) a0 = int(prob.multinomial(1)) s1, r, done, _ = env.step(a0) rate = prob[a0].item() Agent.put(s0, a0, r, s1, rate, 1 - done) s0 = s1 tot_reward += r if done: average_reward = average_reward + 1 / (i_episode + 1) * ( tot_reward - average_reward) if i_episode % 20 == 0: print('Episode ', i_episode, ' tot_reward: ', tot_reward, ' average_reward: ', average_reward) break # Agent.train_net() Agent.update_parameters() Agent.buffer.clean()
