0x00 任务
通过强化学习算法完成倒立摆任务,控制倒立摆在一定范围内摆动。
0x01 设置jupyter登录密码
jupyter notebook --generate-config
jupyter notebook password (会输入两次密码,用来验证)
jupyter notebook 登录
0x02 创建python note
0x03 代码
# 声明包
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym
# 声明绘图功能
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display
def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0,frames[0].shape[0]/72.0),dpi=72)
patch=plt.imshow(frames[0])
plt.axis("off")
def animate(i):
patch.set_data(frames[i])
anim=animation.FuncAnimation(plt.gcf(),animate,frames=len(frames),interval=50)
anim.save('move_carpole.mp4') # 保存动画
display(display_animation(anim,default_mode='loop'))
# 随机移动CartPole
frames=[]
env=gym.make('CartPole-v0')
observation=env.reset() # 重置环境
for step in range(0,200):
frames.append(env.render(mode='rgb_array')) # 加载各个时刻图像到帧
action=np.random.choice(2) # 随机返回: 0 小车向左,1 小车向右
gym.logger.set_level(40)
observation,reward,done,info=env.step(action) # 执行动作
运行后
移动 Caprpole的代码并不重要,重要的是最后一行observation,reward,done,info=env.step(action)
reward 是 即时奖励,若执行了action后,小车位置在+-2.4范围之内而且杆的倾斜成都没有超过20.9°,则设置奖励为1.相反,若小车移出+-2.4范围或者杆倾斜超过了20.9°的话,则奖励为0。退出时 done是一个变量。若为结束状态 则为true
这里代码忽略了done, info变量保存调试信息。
最后使用display_frames_as_gif(frames) 函数去保存我们的gif
# 保存并绘制视频
display_frames_as_gif(frames)
可正常保存视频
CartPole的状态
之前讨论的迷宫问题中,状态指的是每个格子的编号,由单个变量表示,0~8,然而倒立摆具有更复杂的状态定义。
CartPole的状态存储在observation中,变量observarion是4个变量组成的列表,每个变量的内容如
小车位置 -2.4~2.4
小车速度 -∞~+∞
杆的角度 -41.8°~+41.8°
杆的角速度 -∞~+∞
因为变量是连续值,如果想要通过表格的形式来表达Q函数,就需要将他们进行离散化
比如使用0~5来标记变量的连续值
-2.4~-1.6=0
-1.6~-0.8=1
依次类推
则总共有6的4次方总组合 1296种类型 数字 表示 CartPole的状态
而这个时候小车的方向只有向左和向右
所以,可以用1296行x2列的表格来表示Q函数
算法实现
- 变量设置
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym
# 变量设定
ENV='CartPole-v0' # 设置任务名
NUM_DIZITIZED=6 # 设置离散值个数
# 尝试运行 CartPole
env=gym.make(ENV) # 设置要执行的任务
observarion=env.reset() # 环境初始化
- 求取用于离散化的阙值
# 求取用于离散化的阙值
def bins(clip_min,clip_max,num):
return np.linespace(clip_min,clip_max,num+1)[1:-1] # 返回[-1.6,-0.8,0,0,0.8,1.6]
-∞~-1.6=0 -1.6~0.8=1
依次类推
- 创建函数 根据获得的阙值对连续变量进行离散化
def digitize_state(observation):
cart_pos,cart_v,pole_angle,pole_v=observation
digitized=[
np.digitize(cart_pos,bins=bins(-2.4,2.4,NUM_DIZITIZED)),
np.digitize(cart_v,bins=bins(-3.0,3.0,NUM_DIZITIZED)),
np.digitize(pole_angle,bins=bins(-0.5,0.5,NUM_DIZITIZED)),
np.digitize(pole_v,bins=bins(-2.0,2.0,NUM_DIZITIZED))
]
return sum([x*(NUM_DIZITIZED)**i) for i,x in enumerate(digitized)])
以6进制进行计算 如果 存在一个离散值(1,2,3,4) 则求得当前状态值为 160+2*61+362+4*63=985
- Q学习实现
这里需要定义实现类,主要有三个类 Agent Brain 和 Environmet
Agent类表示小推车对象,主要有2个函数,更新Q函数,和确定下一步动作函数
Agent中有一个Brain类的对象作为成员变量。
Brain类可认为是Agent的大脑,通过Q表来实现Q学习,主要有4个函数 bin digitize_state 用来离散化Agent观察到的observation
函数update_Q_table来更新Q表
函数decision_action 来确定来自Q表的动作。
为什么需要将Agent和Brain类分开》? 因为如果使用深度强化学习,将表格型Q改成深度强化学习时只需要改变Brain类就行了。
Environment类是OpenAI Gym的执行环境,执行CartPole环境的是run函数
- start
首先我们需要决定要执行的值动作,所以 Agent将当前状态 observation_t传给Brain ,Brain 离散化状态再根据Q表来确定动作,并将确定的动作返回给Agent,
之后是动作的实际执行环境步骤,Agent将动作action_t传递给Environment,Environment执行动作action_t并将执行后的状态observation_t+1和即时奖励 reward+1 返回给Agent
再更新Q 表, Agent将当前状态observation_t 执行动作 action_t 和执行动作后的observation_t+1 即时奖励reward_t+1传回给Brain,Brain更新Q表,这4个变量综合起来被称为transition
之后 重复该过程就行了,因为获得最大价值的方式只有一种,所以通过Q学习不断拟合,最后会形成唯一解。
完整代码
#!/usr/bin/env python
# coding: utf-8
# In[ ]:
# 声明包
import numpy as np
import matplotlib.pyplot as plt
get_ipython().run_line_magic('matplotlib', 'inline')
import gym
# In[17]:
# 声明绘图功能
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display
def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0,frames[0].shape[0]/72.0),dpi=72)
patch=plt.imshow(frames[0])
plt.axis("off")
def animate(i):
patch.set_data(frames[i])
anim=animation.FuncAnimation(plt.gcf(),animate,frames=len(frames),interval=50)
anim.save('move_carpole.mp4') # 保存动画
display(display_animation(anim,default_mode='loop'))
# In[18]:
# 变量设定
ENV='CartPole-v0' # 设置任务名
NUM_DIZITIZED=6 # 设置离散值个数
GAMMA=0.99 # 时间折扣率
ETA=0.5 # 学习率
MAX_STEPS=200 # 一次实验中的步数
NUM_EPISODES=1000 # 最大实验次数
# 尝试运行 CartPole
env=gym.make(ENV) # 设置要执行的任务
observarion=env.reset() # 环境初始化
# In[19]:
# 定义Agent类
class Agent:
def __init__(self,num_states,num_actions):
self.brain=Brain(num_states,num_actions) # 创建了Brain类的对象,构造方法中有当前状态和动作
# 为智能体创建大脑以做出决策
def update_Q_function(self,observation,action,reward,observation_next):
self.brain.update_Q_table(observation,action,reward,observation_next)
# 更新Brain类中的Q函数
def get_action(self,observation,step):
action=self.brain.decide_action(observation,step)
return action
# 动作的确定
# In[20]:
# 定义Brain类
class Brain:
def __init__(self,num_states,num_actions):
self.num_actions=num_actions # CartPole的动作 向左或者向右
self.q_table=np.random.uniform(low=0,high=1,size=(NUM_DIZITIZED**num_states,num_actions))
# 初始化Q表,行为将状态转换成数字得到的分割数,列为动作数
def bins(self,clip_min,clip_max,num):
return np.linspace(clip_min,clip_max,num+1)[1:-1] # 返回[-1.6,-0.8,0,0,0.8,1.6]
# 求取用于离散化的阙值
def digitize_state(self,observation):
cart_pos,cart_v,pole_angle,pole_v=observation
digitized=[
np.digitize(cart_pos,bins=self.bins(-2.4,2.4,NUM_DIZITIZED)),
np.digitize(cart_v,bins=self.bins(-3.0,3.0,NUM_DIZITIZED)),
np.digitize(pole_angle,bins=self.bins(-0.5,0.5,NUM_DIZITIZED)),
np.digitize(pole_v,bins=self.bins(-2.0,2.0,NUM_DIZITIZED))
]
return sum([x*(NUM_DIZITIZED**i) for i,x in enumerate(digitized)])
# 将连续值转成离散值
def update_Q_table(self,observation,action,reward,observation_next):
# 更新Q表
state=self.digitize_state(observation) # 状态离散化
state_next=self.digitize_state(observation_next) # 下一个状态离散化
Max_Q_next=max(self.q_table[state_next][:]) # 求Q表中下一个状态的最大值 即向右或向左方向的状态值
self.q_table[state,action]=self.q_table[state,action]+ETA*(reward+GAMMA*Max_Q_next-self.q_table[state,action])
# 若要保证Q表几乎不变,则下一跳中状态最大值*时间折扣率+0的奖励需要几乎等于当前状态。
# 也就是说在不断拟合一个定值,最终收敛于该值。
# 为什么会拟合该值?因为该范围值是这个物体运动的大范围值的子集,也就是说会一直重复这个范围值,如果次数够多,并且每次都能通过算法将在这个小范围值之外的区间缩小,
# 那么可通过这种方法进行拟合,训练量足够多,则拟合的范围值越接近我们想要的范围值。
print(self.q_table[state,action])
def decide_action(self,observation,episode):
state=self.digitize_state(observation)
epsilon=0.5*(1/(episode+1))
if epsilon<=np.random.uniform(0,1):
action=np.argmax(self.q_table[state][:]) # 获取对应状态最大值索引。
else:
action=np.random.choice(self.num_actions) # 随机返回0,1动作
return action
# In[21]:
# 执行环境类
class Environment:
def __init__(self):
self.env=gym.make(ENV) # 设置要执行的任务
num_states=self.env.observation_space.shape[0] # 获取任务状态个数
num_actions=self.env.action_space.n # 获取CartPole的动作数 为 2
self.agent=Agent(num_states,num_actions) # 创建在环境中行动的Agent
def run(self):
complete_episodes=0 # 持续195步或者更多
is_episode_final=False # 最终实验标志
frames=[] # 用来存储视频图象的变量
for episode in range(NUM_EPISODES):
observation=self.env.reset() # 环境初始化
for step in range(MAX_STEPS): # 每个回合的循环
if is_episode_final is True: # 将最终实验各个时刻图像添加到帧
frames.append(self.env.render(mode='rgb_array'))
# 求动作
action=self.agent.get_action(observation,episode)
# 根据执行动作找到 s_T ,r_t
observation_next,_,done,_=self.env.step(action) # 不用regain info
# 给予奖励
if done:
if step<195:
reward=-1 # 半途摔倒给予-1作为惩罚
complete_episodes=0 # 站立超过195 重置
else:
reward=1 # 完成给予奖励
complete_episodes+=1 # 更新
else:
reward=0 # 途中奖励为0
# 使用step_1 的状态 observation_next更新Q函数
self.agent.update_Q_function(observation,action,reward,observation_next)
# observation 更新
observation=observation_next
# 结束时处理
if done:
print('{0} Episode: Finished after {1} time steps:'.format(episode,step+1))
break
if is_episode_final is True: # 最后一次实验保存
display_frames_as_gif(frames)
break
if complete_episodes>=10:
print('10回合连续成功')
is_episode_final=True
# In[ ]:
# 主函数调用
if __name__=="__main__":
cartpole_env=Environment()
cartpole_env.run()
# In[ ]:
# In[ ]:
运行结果
0x04 强化学习之Q学习的原理(重点)
观察代码 self.q_table[state,action]=self.q_table[state,action]+ETA(reward+GAMMAMax_Q_next-self.q_table[state,action]) ,刚开始看时,一脸懵逼,直到想通了某个点。
首先我们需要清楚在大量的数据面前,能够满足我们想要的最好的策略 只有一条,有些时候我们可以自己求得该策略,比如迷宫问题,我们可以轻松做到,这里的倒立摆问题我们也能轻松做到,但是小球消方块呢? 我们人类几乎不能在很短时间做出判断,然后消除掉所有的方块,但是机器能。为什么?
满足最优策略只有一条,大数据训练只是为了让我们的策略最终拟合成为最优策略
还是拿这段代码来说 self.q_table[state,action]=self.q_table[state,action]+ETA(reward+GAMMAMax_Q_next-self.q_table[state,action]) Q表的更新是当前Q表+变化值。
所以Q表的更新量其实就是 ETA*(reward+GAMMA*Max_Q_next-self.q_table[state,action])
ETA是学习率。 reward是奖励,这里可以认为是0, GAMMA是时间折扣率为0.99 接近为1 self.q_table[state,action] 为当前Q表,记录了当前状态和当前的方向。
当我们设置变化值为很小时,最终实现Q表几乎不变,此时Q表代表了我们的最优策略。 但是为什么,为什么它就能拟合到最优,而不是别的?每次拟合的过程是什么?
概率问题。
用epsilon贪婪法先进行随机运动,目的是防止智能体一开始就一直朝一个方向进行拟合,比如全部向右,这样小车就飞出去了。之后通过逐步减小epsilon值让小车朝获得最大价值的方向运动,如果成功朝向了最大价值方向运动,则向该方向运动的概率变大,宏观上表现为变化量减小。也就是到达目标的步数减少,而步数的减少使得数据范围缩小,而我们目标数据范围不变,所以相当于我们目标占比变大了,而目标占比变大,使得智能体朝该方向运动概率也变大,周而复始,最终达到几乎完全拟合,变化量忽略不计。