From: https://zhuanlan.zhihu.com/p/21477488
From: OpenAI Gym 关于CartPole的模拟退火解法
Env setting: https://gym.openai.com/docs/
CartPole v0:
CartPole-v0
A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track.
The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over.
A reward of +1 is provided for every timestep that the pole remains upright.
The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.
Environment
Observation
Type: Box(4)
Num | Observation | Min | Max |
---|---|---|---|
0 | Cart Position | -2.4 | 2.4 |
1 | Cart Velocity | -Inf | Inf |
2 | Pole Angle | ~ -41.8° | ~ 41.8° |
3 | Pole Velocity At Tip | -Inf | Inf |
Actions
Type: Discrete(2)
Num | Action |
---|---|
0 | Push cart to the left |
1 | Push cart to the right |
Note: The amount the velocity is reduced or increased is not fixed as it depends on the angle the pole is pointing. This is because the center of gravity of the pole increases the amount of energy needed to move the cart underneath it
Reward
Reward is 1 for every step taken, including the termination step
Starting State
All observations are assigned a uniform random value between ±0.05
Episode Termination
- Pole Angle is more than ±20.9°
- Cart Position is more than ±2.4 (center of the cart reaches the edge of the display)
- Episode length is greater than 200
Solved Requirements
Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials.
From: https://gym.openai.com/docs/
Only one episode here.
import gym env = gym.make('CartPole-v0') env.reset() # start here.
for _ in range(1000): env.render() env.step(env.action_space.sample()) # take a random action
Observations
If we ever want to do better than take random actions at each step, it'd probably be good to actually know what our actions are doing to the environment.
The environment's step
function returns exactly what we need. In fact, step
returns four values. These are:
-
observation
(object): an environment-specific object representing your observation of the environment. For example, pixel data from a camera, joint angles and joint velocities of a robot, or the board state in a board game.reward
(float): amount of reward achieved by the previous action. The scale varies between environments, but the goal is always to increase your total reward.done
(boolean): whether it's time toreset
the environment again. Most (but not all) tasks are divided up into well-defined episodes, anddone
beingTrue
indicates the episode has terminated. (For example, perhaps the pole tipped too far, or you lost your last life.)info
(dict): diagnostic information useful for debugging. It can sometimes be useful for learning (for example, it might contain the raw probabilities behind the environment's last state change). However, official evaluations of your agent are not allowed to use this for learning.
This is just an implementation of the classic "agent-environment loop". Each timestep, the agent chooses an action
, and the environment returns an observation
and a reward
.
The process gets started by calling reset
, which returns an initial observation
. So a more proper way of writing the previous code would be to respect the done
flag:
import gym env = gym.make('CartPole-v0') for i_episode in range(20): observation = env.reset()
for t in range(100): env.render() print(observation) action = env.action_space.sample() observation, reward, done, info = env.step(action) if done: print("Episode finished after {} timesteps".format(t+1)) break
Spaces
In the examples above, we've been sampling random actions from the environment's action space. But what actually are those actions? Every environment comes with first-class Space
objects that describe the valid actions and observations:
import gym env = gym.make('CartPole-v0') print(env.action_space) #> Discrete(2)
print(env.observation_space) #> Box(4,)
Environments
gym
's main purpose is to provide a large collection of environments that expose a common interface and are versioned to allow for comparisons. You can find a listing of them as follows:
from gym import envs print(envs.registry.all()) #> [EnvSpec(DoubleDunk-v0), EnvSpec(InvertedDoublePendulum-v0), EnvSpec(BeamRider-v0), EnvSpec(Phoenix-ram-v0), EnvSpec(Asterix-v0), EnvSpec(TimePilot-v0), EnvSpec(Alien-v0), EnvSpec(Robotank-ram-v0), EnvSpec(CartPole-v0), EnvSpec(Berzerk-v0), EnvSpec(Berzerk-ram-v0), EnvSpec(Gopher-ram-v0), ...
This will give you a list of EnvSpec
s. These define parameters for a particular task, including the number of trials to run and the maximum number of steps.
For example, EnvSpec(Hopper-v1)
defines an environment where the goal is to get a 2D simulated robot to hop; EnvSpec(Go9x9-v0)
defines a Go game on a 9x9 board.
def setup(): ##################################
# Choose environment.
##################################
default_env_name = 'CartPole-v0'
# default_env_name = 'MountainCar-v0' # default_env_name = 'Pendulum-v0'
# if env_name provided as cmd line arg, then use that
env_name = sys.argv[1] if len(sys.argv) > 1 else default_env_name env = gym.make(env_name) # --> 加载预设的环境 ###########################################################################
state_dim, action_dim = init(env, env_name) # --> network_vars = get_network(state_dim, action_dim) # --> init_session() return env, state_dim, action_dim, network_vars
def init(env, env_name): """ Initialise any globals, e.g. the replay_buffer, epsilon, etc. return: state_dim : The length of the state vector for the env action_dim: The length of the action space, i.e. the number of actions NB: for discrete action envs such as the cartpole and mountain car, this function can be left unchanged. Hints for envs with continuous action spaces, e.g. "Pendulum-v0" 1) you'll need to modify this function to discretise the action space and create a global dictionary mapping from action index to action (which you can use in `get_env_action()`) 2) for Pendulum-v0 `env.action_space.low[0]` and `env.action_space.high[0]` are the limits of the action space. 3) setting a global flag iscontinuous which you can use in `get_env_action()` might help in using the same code for discrete and (discretised) continuous action spaces """
global replay_buffer, epsilon replay_buffer = [] epsilon = INITIAL_EPSILON state_dim = env.observation_space.shape[0] action_dim = env.action_space.n return state_dim, action_dim
#
state_dim
Type: Box(4)
Num | Observation | Min | Max |
---|---|---|---|
0 | Cart Position | -2.4 | 2.4 |
1 | Cart Velocity | -Inf | Inf |
2 | Pole Angle | ~ -41.8° | ~ 41.8° |
3 | Pole Velocity At Tip | -Inf | Inf |
def get_network(state_dim, action_dim, hidden_nodes=HIDDEN_NODES): """Define the neural network used to approximate the q-function The suggested structure is to have each output node represent a Q value for one action. e.g. for cartpole there will be two output nodes. Hints: 1) Given how q-values are used within RL, is it necessary to have output activation functions? 2) You will set `target_in` in `get_train_batch` further down. Probably best to implement that before implementing the loss (there are further hints there) """ action_in = tf.placeholder("float", [None, action_dim]) target_in = tf.placeholder("float", [None]) #################
# input layer
#################
state_in = tf.placeholder("float", [None, state_dim]) #################
# network weights
#################
num_hiddenCell = 20 W1 = tf.Variable(tf.constant(0.01, shape = [state_dim, num_hiddenCell])) b1 = tf.Variable(tf.constant(0.01, shape = [num_hiddenCell])) W2 = tf.Variable(tf.constant(0.01, shape = [num_hiddenCell, action_dim])) b2 = tf.Variable(tf.constant(0.01, shape = [action_dim])) #################
# (1) hidden layer
#################
h_layer = tf.nn.relu(tf.matmul(state_in, W1) + b1) #################
# (2) Q Value layer
#################
# TO IMPLEMENT: Q network, whose input is state_in, and has action_dim outputs
# which are the network's esitmation of the Q values for those actions and the
# input state. The final layer should be assigned to the variable q_values
q_values = tf.matmul(h_layer, W2) + b2
q_selected_action = tf.reduce_sum(tf.multiply(q_values, action_in), reduction_indices=1) # TO IMPLEMENT: loss function
# should only be one line, if target_in is implemented correctly
loss = tf.reduce_mean(tf.square(tf.subtract(q_selected_action, target_in)))
optimise_step = tf.train.AdamOptimizer().minimize(loss) train_loss_summary_op = tf.summary.scalar("TrainingLoss", loss) return state_in, action_in, target_in, q_values, q_selected_action, \ loss, optimise_step, train_loss_summary_op
def main(): env, state_dim, action_dim, network_vars = setup() qtrain(env, state_dim, action_dim, *network_vars, render=True)
def qtrain(env, state_dim, action_dim,
state_in, action_in, target_in, q_values, q_selected_action, loss, optimise_step, train_loss_summary_op,
num_episodes =NUM_EPISODES, ep_max_steps =EP_MAX_STEPS, test_frequency =TEST_FREQUENCY, num_test_eps =NUM_TEST_EPS, final_epsilon =FINAL_EPSILON, epsilon_decay_steps =EPSILON_DECAY_STEPS, force_test_mode=False, render =True):
global epsilon # Record the number of times we do a training batch, take a step, and
# the total_reward across all eps
batch_presentations_count = total_steps = total_reward = 0 ###########################################################################
for episode in range(num_episodes): # initialize task
state = env.reset() if render: env.render() # Update epsilon once per episode - exp decaying
epsilon -= (epsilon - final_epsilon) / epsilon_decay_steps # in test mode we set epsilon to 0
test_mode = force_test_mode or \ ((episode % test_frequency) < num_test_eps and episode > num_test_eps ) if test_mode: print("Test mode (epsilon set to 0.0)") ep_reward = 0 #######################################################################
# 只关心要保证好样本
for step in range(ep_max_steps): total_steps += 1
# get an action and take a step in the environment
# 目前状态a,还是傻傻的网络会返回怎样的action
action = get_action(state, state_in, q_values, epsilon, test_mode, action_dim) # --> ##################################
# ??? done.
##################################
env_action = get_env_action(action) # 傻网络会带来怎样的状态b next_state, reward, done, _ = env.step(env_action) ep_reward += reward # display the updated environment
if render: env.render() # comment this line to possibly reduce training time
##################################
# !!! done.
##################################
# add the s,a,r,s' samples to the replay_buffer
# 把这个愚蠢的结果保存
update_replay_buffer(replay_buffer, state, action, reward, next_state, done, action_dim) state = next_state # perform a training step if the replay_buffer has a batch worth of samples
# 只关心一批好样本足够后,就开始训练
if (len(replay_buffer) > BATCH_SIZE): ##################################
# !!! done.
##################################
do_train_step(replay_buffer, state_in, action_in, target_in, q_values, q_selected_action, loss, optimise_step, train_loss_summary_op, batch_presentations_count) batch_presentations_count += 1
if done: break total_reward += ep_reward test_or_train = "test" if test_mode else "train"
print("end {0} episode {1}, ep reward: {2}, ave reward: {3}, \ Batch presentations: {4}, epsilon: {5}".format( test_or_train, episode, round(ep_reward,2), total_reward / (episode + 1), batch_presentations_count, epsilon ))
def get_action(state, state_in, q_values, epsilon, test_mode, action_dim):
Q_estimates = q_values.eval(feed_dict={state_in: [state]})[0]
epsilon_to_use = 0.0 if test_mode else epsilon
if random.random() < epsilon_to_use: action = random.randint(0, action_dim - 1) else: action = np.argmax(Q_estimates) # 下步会有2种可能,选其中最大的作为action return action
replay_buffer里放着一堆好样本。
def do_train_step(replay_buffer, state_in, action_in, target_in, q_values, q_selected_action, loss, optimise_step, train_loss_summary_op, batch_presentations_count):
minibatch = random.sample(replay_buffer, BATCH_SIZE) target_batch, state_batch, action_batch = get_train_batch(q_values, state_in, minibatch) # ---->
summary, _ = session.run([train_loss_summary_op, optimise_step], feed_dict={ target_in: target_batch, state_in: state_batch, action_in: action_batch }) writer.add_summary(summary, batch_presentations_count)
def get_train_batch(q_values, state_in, minibatch): """ Generate Batch samples for training by sampling the replay buffer" Batches values are suggested to be the following; state_batch: Batch of state values action_batch: Batch of action values target_batch: Target batch for (s,a) pair i.e. one application of the bellman update rule. return: target_batch, state_batch, action_batch Hints: 1) To calculate the target batch values, you will need to use the q_values for the next_state for each entry in the batch. 2) The target value, combined with your loss defined in `get_network()` should reflect the equation in the middle of slide 12 of Deep RL 1 Lecture notes here: https://webcms3.cse.unsw.edu.au/COMP9444/17s2/resources/12494 """ state_batch = [data[0] for data in minibatch] action_batch = [data[1] for data in minibatch] reward_batch = [data[2] for data in minibatch] next_state_batch = [data[3] for data in minibatch] target_batch = [] Q_value_batch = q_values.eval(feed_dict={ state_in: next_state_batch }) for i in range(0, BATCH_SIZE): sample_is_done = minibatch[i][4] if sample_is_done: target_batch.append(reward_batch[i]) else: ##################################
##################################
##################################
# TO IMPLEMENT: set the target_val to the correct Q value update
target_val = reward_batch[i] + GAMMA * np.max(Q_value_batch[i]) target_batch.append(target_val)
return target_batch, state_batch, action_batch