強化學習 單臂擺(CartPole) (DQN, Reinforce, DDPG, PPO)Pytorch


單臂擺是強化學習的一個經典模型,本文采用了4種不同的算法來解決這個問題,使用Pytorch實現。

DQN:

參考:

算法思想:

https://mofanpy.com/tutorials/machine-learning/torch/DQN/

算法實現

https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

個人理解:DQN算法將Q學習和神經網絡算法結合,解決了狀態空間連續的問題。由於Q學習是off-policy的,所以需要target網絡,即需要一個滯后版本的神經網絡,防止一些並非最優的動作被采樣之后,該動作的reward增加,之后就一直選擇該非最優動作,從而影響學習的效率。由於神經網絡的輸入和Target要求獨立同分布,所以采用ReplayBuffer和隨機采樣來解決這個問題。DQN的神經網絡目標是讓Q值預測的更准,所以loss是target和eval的完全平方差。

代碼:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym
import random
import numpy as np
from collections import namedtuple

GAMMA = 0.99
lr = 0.1
EPSION = 0.1
buffer_size = 10000  # replay池的大小
batch_size = 32
num_episode = 100000
target_update = 10  # 每過多少個episode將net的參數復制到target_net


# 定義神經網絡
class Net(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Net, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        self.Linear2 = nn.Linear(hidden_size, hidden_size)
        self.Linear3 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # print('x: ', x)
        x = F.relu(self.Linear1(x))
        x = F.relu(self.Linear2(x))
        x = self.Linear3(x)
        return x


# nametuple容器
Transition = namedtuple('Transition',
                        ('state', 'action', 'reward', 'done', 'next_state'))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):  # 采樣
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


class DQN(object):
    def __init__(self, input_size, hidden_size, output_size):
        self.net = Net(input_size, hidden_size, output_size)
        self.target_net = Net(input_size, hidden_size, output_size)
        self.optim = optim.Adam(self.net.parameters(), lr=lr)

        self.target_net.load_state_dict(self.net.state_dict())
        self.buffer = ReplayMemory(buffer_size)
        self.loss_func = nn.MSELoss()
        self.steps_done = 0

    def put(self, s0, a0, r, t, s1):
        self.buffer.push(s0, a0, r, t, s1)

    def select_action(self, state):
        eps_threshold = random.random()
        action = self.net(torch.Tensor(state))
        if eps_threshold > EPSION:
            choice = torch.argmax(action).numpy()
        else:
            choice = np.random.randint(0, action.shape[
                0])  # 隨機[0, action.shape[0]]之間的數
        return choice

    def update_parameters(self):
        if self.buffer.__len__() < batch_size:
            return
        samples = self.buffer.sample(batch_size)
        batch = Transition(*zip(*samples))
        # 將tuple轉化為numpy
        tmp = np.vstack(batch.action)
        # 轉化成Tensor
        state_batch = torch.Tensor(batch.state)
        action_batch = torch.LongTensor(tmp.astype(int))
        reward_batch = torch.Tensor(batch.reward)
        done_batch = torch.Tensor(batch.done)
        next_state_batch = torch.Tensor(batch.next_state)

        q_next = torch.max(self.target_net(next_state_batch).detach(), dim=1,
                           keepdim=True)[0]
        q_eval = self.net(state_batch).gather(1, action_batch)
        q_tar = reward_batch.unsqueeze(1) + (1-done_batch) * GAMMA * q_next
        loss = self.loss_func(q_eval, q_tar)
        # print(loss)
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()


if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    # 狀態空間:4維
    # 動作空間:1維,並且是離散的,只有0和1兩個動作
    Agent = DQN(env.observation_space.shape[0], 256, env.action_space.n)
    average_reward = 0  # 目前所有的episode的reward的平均值
    for i_episode in range(num_episode):
        s0 = env.reset()
        tot_reward = 0  # 每個episode的總reward
        tot_time = 0  # 實際每輪運行的時間 (reward的定義可能不一樣)
        while True:
            env.render()
            a0 = Agent.select_action(s0)
            s1, r, done, _ = env.step(a0)
            tot_time += r  # 計算當前episode的總時間
            # 網上定義的reward方法
            # x, x_dot, theta, theta_dot = s1
            # r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
            # r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
            # r = r1 + r2
            tot_reward += r  # 計算當前episode的總reward
            if done:
                t = 1
            else:
                t = 0
            Agent.put(s0, a0, r, t, s1)  # 放入replay池
            s0 = s1
            Agent.update_parameters()
            if done:
                average_reward = average_reward + 1 / (i_episode + 1) * (
                        tot_reward - average_reward)
                print('Episode ', i_episode, 'tot_time: ', tot_time,
                      ' tot_reward: ', tot_reward, ' average_reward: ',
                      average_reward)
                break
        if i_episode % target_update == 0:
            Agent.target_net.load_state_dict(Agent.net.state_dict())

有一個點需要注意,網上有些DQN的實現沒有考慮終止狀態,所以需要修改Reward才能達到好的效果。在考慮終止狀態后,使用原始的reward就可以學習。

Reinforce:

參考:

思路及代碼:

https://blog.csdn.net/qq_37266917/article/details/109855244

個人理解:

Reinforce是一種策略梯度算法,對參數化的策略梯度算法進行梯度上升。需要注意網絡不能太復雜,不然會過擬合導致很難學習。通過策略梯度定理,我們知道了怎么進行梯度上升。概率前面的回報可以看成梯度上升的幅度,即回報越大提升的概率也越多。所以在Policy Gradient中引入的基線(baseline),以防止某些非最優的動作被選擇之后概率變得過大(雖然在樣本足夠多的時候這個問題也能解決)

神經網絡的loss是 t時刻的回報 * t時刻的動作的概率取對數。以及要取負,因為神經網絡是梯度下降,最小化loss,取負數就是最大化回報。

 

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym
import random
import numpy as np
from torch.distributions import Categorical
from collections import deque
from collections import namedtuple

GAMMA = 1.0
lr = 0.1
EPSION = 0.9
buffer_size = 10000
batch_size = 32
num_episode = 100000
target_update = 10
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200

class Policy(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Policy, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        self.Linear1.weight.data.normal_(0, 0.1)
        # self.Linear2 = nn.Linear(hidden_size, hidden_size)
        # self.Linear2.weight.data.normal_(0, 0.1)
        self.Linear3 = nn.Linear(hidden_size, output_size)
        self.Linear3.weight.data.normal_(0, 0.1)

    def forward(self, x):
        x = F.relu(self.Linear1(x))
        # x = F.relu(self.Linear2(x))
        x = F.softmax(self.Linear3(x), dim=1)
        return x
        # x = F.relu(self.fc1(x))
        # x = self.fc2(x)
        # return F.softmax(x, dim=1)

class Reinforce(object):
    def __init__(self, input_size, hidden_size, output_size):
        self.net = Policy(input_size, hidden_size, output_size)
        self.optim = optim.Adam(self.net.parameters(), lr=0.01)

    def select_action(self, s):
        s = torch.Tensor(s).unsqueeze(0)
        probs = self.net(s)
        tmp = Categorical(probs)
        a = tmp.sample()
        log_prob = tmp.log_prob(a)
        return a.item(), log_prob

    def update_parameters(self, rewards, log_probs):
        R = 0
        loss = 0
        # for i in reversed(range(len(rewards))):
        #     R = rewards[i] + GAMMA * R
        for i in reversed(range(len(rewards))):
            R = rewards[i] + GAMMA * R
            loss = loss - R * log_probs[i]
        # discounts = [GAMMA ** i for i in range(len(rewards) + 1)]
        # R = sum([a * b for a, b in zip(discounts, rewards)])
        # policy_loss = []
        # for log_prob in log_probs:
        #     policy_loss.append(-log_prob * R)
        # loss = torch.cat(policy_loss).sum()
        # print('loss: ', len(loss))
        # loss = loss / len(loss)
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()

if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    average_reward = 0
    Agent = Reinforce(env.observation_space.shape[0], 16, env.action_space.n)
    # scores_deque = deque(maxlen=100)
    # scores = []
    for i_episode in range(1, num_episode + 1):
        s = env.reset()
        log_probs = []
        rewards = []
        while True:
            env.render()
            a, prob = Agent.select_action(s)
            s1, r, done, _ = env.step(a)
            # scores_deque.append(sum(rewards))
            # scores.append(sum(rewards))
            log_probs.append(prob)
            rewards.append(r)
            s = s1
            if done:
                average_reward = average_reward + (1 / (i_episode + 1)) * (np.sum(rewards) - average_reward)
                if i_episode % 100 == 0:
                    # print('Episode {}\t Average Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))
                    print('episode: ', i_episode, "tot_rewards: ", np.sum(rewards), 'average_rewards: ', average_reward)
                break
        Agent.update_parameters(rewards, log_probs)

 

DDPG:

參考:

思路:https://www.cnblogs.com/pinard/p/10345762.html

實現:https://zhuanlan.zhihu.com/p/99406809

個人理解:DDPG算法采用了Actor-Critic框架,像是DQN和Policy Gradient的結合。在DDPG中,Actor輸出的是一個具體的動作,而不是動作的概率分布,Critic輸出的是動作的Q值。Actor和Critic都需要一個Tareget網絡,需要ReplayBuffer打破相關性。網上我沒找到用DDPG和Pytorch解決單臂桿問題的代碼,所以我的解決方法可能不是最好的。因為單臂桿的動作是離散的2個(0,1),最開始我給Actor設置了2個輸出並用argmax決定是哪個。后面發現argmax沒有梯度,於是我將輸出改為了一個,並套了一層sigmoid,輸出小於0.5當0算,大於0.5當1算。Critic的loss和DQN類似,都是target和eval的完全平方差。Actor的loss需要自己先輸出a,再用critic得到估值,求平均值,再取負。

代碼:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym
import random
import numpy as np
from collections import namedtuple
import math

GAMMA = 0.9
lr = 0.1
EPSION = 0.9
buffer_size = 10000
batch_size = 32
num_episode = 100000
target_update = 10
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
tau = 0.02

#定義神經網絡
class Actor(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Actor, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        self.Linear1.weight.data.normal_(0, 0.1)
        # self.Linear2 = nn.Linear(hidden_size, hidden_size)
        # self.Linear2.weight.data.normal_(0, 0.1)
        self.Linear3 = nn.Linear(hidden_size, output_size)
        self.Linear3.weight.data.normal_(0, 0.1)

    def forward(self, x):
        # print('x: ', x)
        x = F.relu(self.Linear1(x))
        # x = F.relu(self.Linear2(x))
        x = torch.sigmoid(self.Linear3(x))
        return x

class Critic(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Critic, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        self.Linear1.weight.data.normal_(0, 0.1)
        # self.Linear2 = nn.Linear(hidden_size, hidden_size)
        # self.Linear2.weight.data.normal_(0, 0.1)
        self.Linear3 = nn.Linear(hidden_size, output_size)
        self.Linear3.weight.data.normal_(0, 0.1)

    def forward(self, s, a):
        # print('s1: ', s)
        # print('a1: ', a)
        x = torch.cat([s, a], dim=1)
        # print('x: ', x)
        x = F.relu(self.Linear1(x))
        # x = F.relu(self.Linear2(x))
        x = self.Linear3(x)
        return x

#nametuple容器
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):#采樣
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


class DDPG(object):
    def __init__(self, input_size, action_shape, hidden_size, output_size):
        self.actor = Actor(input_size, hidden_size, action_shape)
        self.actor_target = Actor(input_size, hidden_size, action_shape)
        self.critic = Critic(input_size + action_shape, hidden_size, action_shape)
        self.critic_target = Critic(input_size + action_shape, hidden_size, action_shape)
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=0.01)
        self.critic_optim = optim.Adam(self.critic.parameters(), lr=0.01)

        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.buffer = ReplayMemory(buffer_size)
        self.loss_func = nn.MSELoss()
        self.steps_done = 0

    def put(self, s0, a0, r, s1, done):
        self.buffer.push(s0, a0, r, s1, done)

    def select_action(self, state):
        state = torch.Tensor(state)
        a = self.actor(state)
        return a

    def update_parameters(self):
        if self.buffer.__len__() < batch_size:
            return
        samples = self.buffer.sample(batch_size)
        batch = Transition(*zip(*samples))
        # print(batch.action)
        #將tuple轉化為numpy
        # tmp = np.vstack(batch.action)
        # print(tmp)
        #轉化成Tensor
        state_batch = torch.Tensor(batch.state)
        action_batch = torch.Tensor(batch.action).unsqueeze(0).view(-1, 1)
        reward_batch = torch.Tensor(batch.reward)
        next_state_batch = torch.Tensor(batch.next_state)
        done_batch = torch.Tensor(batch.done)
        #critic更新
        next_action_batch = self.actor_target(next_state_batch).unsqueeze(0).detach().view(-1, 1)
        # print('batch: ', next_action_batch)

        r_eval = self.critic(state_batch, action_batch)
        # print('s: ', next_state_batch)
        # print('a: ', next_action_batch)
        r_target = reward_batch + GAMMA * self.critic_target(next_state_batch, next_action_batch).detach().view(1, -1) * done_batch
        r_eval = torch.squeeze(r_eval)
        r_target = torch.squeeze(r_target)
        loss = self.loss_func(r_eval, r_target)
        self.critic_optim.zero_grad()
        loss.backward()
        self.critic_optim.step()
        #actor更新
        a = self.actor(state_batch).unsqueeze(0).view(-1, 1)
        # print('a: ', a)
        loss = -torch.mean(self.critic(state_batch, a))
        self.actor_optim.zero_grad()
        loss.backward()
        # print('a: ', a)
        self.actor_optim.step()
        #soft update
        def soft_update(net_target, net):
            for target_param, param in zip(net_target.parameters(), net.parameters()):
                target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)

        soft_update(self.actor_target, self.actor)
        soft_update(self.critic_target, self.critic)



if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    Agent = DDPG(env.observation_space.shape[0], 1, 16, env.action_space.n)
    average_reward = 0
    for i_episode in range(num_episode):
        s0 = env.reset()
        tot_reward = 0
        tot_time = 0
        while True:
            env.render()
            a0 = Agent.select_action(s0)
            s1, r, done, _ = env.step(round(a0.detach().numpy()[0]))
            tot_time += r
            tot_reward += r
            Agent.put(s0, a0, r, s1, 1 - done) #結束狀態很重要,不然會很難學習。
            s0 = s1
            Agent.update_parameters()
            if done:
                average_reward = average_reward + 1 / (i_episode + 1) * (tot_time - average_reward)
                # if i_episode % 100 == 0:
                print('Episode ', i_episode, 'tot_time: ', tot_time, ' tot_reward: ', tot_reward, ' average_reward: ', average_reward)
                break
        # if i_episode % target_update == 0:
        #     Agent.target_net.load_state_dict(Agent.net.state_dict())

 

PPO:

參考:

PPO算法流程及思想:

https://blog.csdn.net/qq_30615903/article/details/86308045

https://www.jianshu.com/p/9f113adc0c50

PPO算法的實現:

https://blog.csdn.net/weixin_42165585/article/details/112362125

個人理解:

PPO算法也是Actor-Critic架構,但是與DDPG不同,PPO為on-policy算法,所以不需要設計target網絡,也不需要ReplayBuffer, 並且Actor和Critic的網絡參數可以共享以便加快學習。PPO引入了重要度采樣,使得每個episode的數據可以被多訓練幾次(實際的情況中,采樣可能非常耗時)從而節省時間,clip保證的更新的幅度不會太大。

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from collections import namedtuple
import random
import gym
import math

lr = 0.0005
Capacity = 10000
num_epidose = 10000
Gamma = 0.98
lmbda = 0.95
eps_clip = 0.1

class Net(nn.Module):
    def __init__(self, input_size,hidden_size, output_size):
        super(Net, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        # self.Linear2 = nn.Linear(hidden_size, hidden_size)
        self.Linear_actor = nn.Linear(hidden_size, output_size)
        self.Linear_critic = nn.Linear(hidden_size, 1)

    def actor_forward(self, s, dim):
        s = F.relu(self.Linear1(s))
        prob = F.softmax(self.Linear_actor(s), dim=dim)
        # print(prob)
        return prob

    def critic_forward(self, s):
        s = F.relu(self.Linear1(s))
        # s = F.relu(self.Linear2(s))
        return self.Linear_critic(s)

Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'rate', 'done'))


class ReplayBuffer(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):#采樣
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

    def clean(self):
        self.position = 0
        self.memory = []

class PPO(object):
    def __init__(self, input_size, hidden_size, output_size):
        super(PPO, self).__init__()
        self.net = Net(input_size, hidden_size, output_size)
        self.optim = optim.Adam(self.net.parameters(), lr=lr)
        self.buffer = ReplayBuffer(capacity=Capacity)

    def act(self, s, dim):
        s = torch.Tensor(s)
        prob = self.net.actor_forward(s, dim)
        return prob

    def critic(self, s):
        return self.net.critic_forward(s)

    def put(self, s0, a0, r, s1, rate, done):
        self.buffer.push(s0, a0, r, s1, rate, done)

    def make_batch(self):
        batch = self.buffer.memory
        samples = self.buffer.memory
        batch = Transition(*zip(*samples))
        state_batch = torch.Tensor(batch.state).view(-1, 1)
        action_batch = torch.LongTensor(batch.action).view(-1, 1)
        reward_batch = torch.Tensor(batch.reward).view(-1, 1)
        next_state_batch = torch.Tensor(batch.next_state)
        rate_batch = torch.Tensor(batch.rate).view(-1, 1)
        done_batch = torch.LongTensor(batch.done).view(-1, 1)
        return state_batch, action_batch, reward_batch, next_state_batch, done_batch, rate_batch

    def update_parameters(self):
        samples = self.buffer.memory
        batch = Transition(*zip(*samples))
        batch = self.buffer.memory
        samples = self.buffer.memory
        batch = Transition(*zip(*samples))
        state_batch = torch.Tensor(batch.state)
        action_batch = torch.LongTensor(batch.action).view(-1, 1)
        reward_batch = torch.Tensor(batch.reward).view(-1, 1)
        next_state_batch = torch.Tensor(batch.next_state)
        rate_batch = torch.Tensor(batch.rate).view(-1, 1)
        done_batch = torch.LongTensor(batch.done).view(-1, 1)
        for i in range(3):
            td_target = reward_batch + Gamma * self.critic(next_state_batch) * done_batch
            delta = td_target - self.critic(state_batch)
            delta = delta.detach().numpy()

            advantage_list = []
            advantage = 0.0
            for delta_t in delta[::-1]:
                advantage = Gamma * advantage + delta_t
                advantage_list.append(advantage)

            advantage_list.reverse()
            advantage = torch.Tensor(advantage_list)
            prob = self.act(state_batch, 1).squeeze(0)
            prob_a = prob.gather(1, action_batch.view(-1, 1))
            ratio = torch.exp(torch.log(prob_a) - torch.log(rate_batch))
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - eps_clip, 1 + eps_clip) * advantage
            loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.critic(state_batch), td_target.detach())
            self.optim.zero_grad()
            loss.mean().backward()
            self.optim.step()



if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    Agent = PPO(env.observation_space.shape[0], 256, env.action_space.n)
    average_reward = 0
    for i_episode in range(num_epidose):
        s0 = env.reset()
        tot_reward = 0
        while True:
            env.render()
            prob = Agent.act(torch.from_numpy(s0).float(), 0)
            a0 = int(prob.multinomial(1))
            s1, r, done, _ = env.step(a0)
            rate = prob[a0].item()
            Agent.put(s0, a0, r, s1, rate, 1 - done)
            s0 = s1
            tot_reward += r
            if done:
                average_reward = average_reward + 1 / (i_episode + 1) * (
                        tot_reward - average_reward)
                if i_episode % 20 == 0:
                    print('Episode ', i_episode,
                      ' tot_reward: ', tot_reward, ' average_reward: ',
                      average_reward)
                break
        # Agent.train_net()
        Agent.update_parameters()
        Agent.buffer.clean()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM