[DQN] OpenAI Gym - CartPole


Fromhttps://zhuanlan.zhihu.com/p/21477488

FromOpenAI Gym 关于CartPole的模拟退火解法

Env setting: https://gym.openai.com/docs/

CartPole v0openai/gym

 


CartPole-v0

A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track.

The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over.

A reward of +1 is provided for every timestep that the pole remains upright.

The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.

 

Environment

Observation

Type: Box(4)

Num Observation Min Max
0 Cart Position -2.4 2.4
1 Cart Velocity -Inf Inf
2 Pole Angle ~ -41.8° ~ 41.8°
3 Pole Velocity At Tip -Inf Inf

Actions

Type: Discrete(2)

Num Action
0 Push cart to the left
1 Push cart to the right

Note: The amount the velocity is reduced or increased is not fixed as it depends on the angle the pole is pointing. This is because the center of gravity of the pole increases the amount of energy needed to move the cart underneath it

Reward

Reward is 1 for every step taken, including the termination step

Starting State

All observations are assigned a uniform random value between ±0.05

Episode Termination

  1. Pole Angle is more than ±20.9°
  2. Cart Position is more than ±2.4 (center of the cart reaches the edge of the display)
  3. Episode length is greater than 200

Solved Requirements

Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials.

 

 

From: https://gym.openai.com/docs/

Only one episode here.

import gym env = gym.make('CartPole-v0') env.reset()      # start here. 
for _ in range(1000): env.render() env.step(env.action_space.sample()) # take a random action

 

Observations

If we ever want to do better than take random actions at each step, it'd probably be good to actually know what our actions are doing to the environment.

The environment's step function returns exactly what we need. In fact, step returns four values. These are:

    • observation (object): an environment-specific object representing your observation of the environment. For example, pixel data from a camera, joint angles and joint velocities of a robot, or the board state in a board game.
    • reward (float): amount of reward achieved by the previous action. The scale varies between environments, but the goal is always to increase your total reward.
    • done (boolean): whether it's time to reset the environment again. Most (but not all) tasks are divided up into well-defined episodes, and done being True indicates the episode has terminated. (For example, perhaps the pole tipped too far, or you lost your last life.)
    • info (dict): diagnostic information useful for debugging. It can sometimes be useful for learning (for example, it might contain the raw probabilities behind the environment's last state change). However, official evaluations of your agent are not allowed to use this for learning.

This is just an implementation of the classic "agent-environment loop". Each timestep, the agent chooses an action, and the environment returns an observation and a reward.

The process gets started by calling reset, which returns an initial observation. So a more proper way of writing the previous code would be to respect the done flag:

import gym env = gym.make('CartPole-v0') for i_episode in range(20): observation = env.reset()  
for t in range(100): env.render() print(observation) action = env.action_space.sample() observation, reward, done, info = env.step(action) if done: print("Episode finished after {} timesteps".format(t+1)) break

 

Spaces

In the examples above, we've been sampling random actions from the environment's action space. But what actually are those actions? Every environment comes with first-class Space objects that describe the valid actions and observations:

import gym env = gym.make('CartPole-v0') print(env.action_space) #> Discrete(2)
print(env.observation_space) #> Box(4,)

  

Environments

gym's main purpose is to provide a large collection of environments that expose a common interface and are versioned to allow for comparisons. You can find a listing of them as follows:

from gym import envs print(envs.registry.all()) #> [EnvSpec(DoubleDunk-v0), EnvSpec(InvertedDoublePendulum-v0), EnvSpec(BeamRider-v0), EnvSpec(Phoenix-ram-v0), EnvSpec(Asterix-v0), EnvSpec(TimePilot-v0), EnvSpec(Alien-v0), EnvSpec(Robotank-ram-v0), EnvSpec(CartPole-v0), EnvSpec(Berzerk-v0), EnvSpec(Berzerk-ram-v0), EnvSpec(Gopher-ram-v0), ...

 

This will give you a list of EnvSpecs. These define parameters for a particular task, including the number of trials to run and the maximum number of steps.

For example, EnvSpec(Hopper-v1) defines an environment where the goal is to get a 2D simulated robot to hop; EnvSpec(Go9x9-v0) defines a Go game on a 9x9 board.

 

 

 

def setup(): ##################################
    # Choose environment.
    ##################################
    default_env_name = 'CartPole-v0'
 # default_env_name = 'MountainCar-v0' # default_env_name = 'Pendulum-v0'
    
    # if env_name provided as cmd line arg, then use that
    env_name = sys.argv[1] if len(sys.argv) > 1 else default_env_name env = gym.make(env_name) # --> 加载预设的环境 ###########################################################################
 state_dim, action_dim = init(env, env_name)  # -->  network_vars = get_network(state_dim, action_dim) # --> init_session() return env, state_dim, action_dim, network_vars

 

def init(env, env_name): """ Initialise any globals, e.g. the replay_buffer, epsilon, etc. return: state_dim : The length of the state vector for the env action_dim: The length of the action space, i.e. the number of actions NB: for discrete action envs such as the cartpole and mountain car, this function can be left unchanged. Hints for envs with continuous action spaces, e.g. "Pendulum-v0" 1) you'll need to modify this function to discretise the action space and create a global dictionary mapping from action index to action (which you can use in `get_env_action()`) 2) for Pendulum-v0 `env.action_space.low[0]` and `env.action_space.high[0]` are the limits of the action space. 3) setting a global flag iscontinuous which you can use in `get_env_action()` might help in using the same code for discrete and (discretised) continuous action spaces """
    global replay_buffer, epsilon replay_buffer = [] epsilon = INITIAL_EPSILON state_dim = env.observation_space.shape[0] action_dim = env.action_space.n return state_dim, action_dim
#

 

state_dim

Type: Box(4)

Num Observation Min Max
0 Cart Position -2.4 2.4
1 Cart Velocity -Inf Inf
2 Pole Angle ~ -41.8° ~ 41.8°
3 Pole Velocity At Tip -Inf Inf

 

 

def get_network(state_dim, action_dim, hidden_nodes=HIDDEN_NODES): """Define the neural network used to approximate the q-function The suggested structure is to have each output node represent a Q value for one action. e.g. for cartpole there will be two output nodes. Hints: 1) Given how q-values are used within RL, is it necessary to have output activation functions? 2) You will set `target_in` in `get_train_batch` further down. Probably best to implement that before implementing the loss (there are further hints there) """ action_in = tf.placeholder("float", [None, action_dim]) target_in = tf.placeholder("float", [None]) #################
    # input layer
    #################
    state_in    = tf.placeholder("float", [None, state_dim]) #################
    # network weights
    #################
    num_hiddenCell = 20 W1 = tf.Variable(tf.constant(0.01, shape = [state_dim, num_hiddenCell])) b1 = tf.Variable(tf.constant(0.01, shape = [num_hiddenCell])) W2 = tf.Variable(tf.constant(0.01, shape = [num_hiddenCell, action_dim])) b2 = tf.Variable(tf.constant(0.01, shape = [action_dim])) #################
    # (1) hidden layer
    #################
    h_layer = tf.nn.relu(tf.matmul(state_in, W1) + b1) #################
    # (2) Q Value layer
    #################
    # TO IMPLEMENT: Q network, whose input is state_in, and has action_dim outputs
    # which are the network's esitmation of the Q values for those actions and the
    # input state. The final layer should be assigned to the variable q_values
    q_values = tf.matmul(h_layer, W2) + b2 
    q_selected_action = tf.reduce_sum(tf.multiply(q_values, action_in), reduction_indices=1) # TO IMPLEMENT: loss function
    # should only be one line, if target_in is implemented correctly
    loss = tf.reduce_mean(tf.square(tf.subtract(q_selected_action, target_in)))
optimise_step
= tf.train.AdamOptimizer().minimize(loss) train_loss_summary_op = tf.summary.scalar("TrainingLoss", loss) return state_in, action_in, target_in, q_values, q_selected_action, \ loss, optimise_step, train_loss_summary_op

 

 

def main(): env, state_dim, action_dim, network_vars = setup() qtrain(env, state_dim, action_dim, *network_vars, render=True)

 

def qtrain(env, state_dim, action_dim,
state_in, action_in, target_in, q_values, q_selected_action, loss, optimise_step, train_loss_summary_op,
num_episodes
=NUM_EPISODES, ep_max_steps =EP_MAX_STEPS, test_frequency =TEST_FREQUENCY, num_test_eps =NUM_TEST_EPS, final_epsilon =FINAL_EPSILON, epsilon_decay_steps =EPSILON_DECAY_STEPS, force_test_mode=False, render =True):
global epsilon # Record the number of times we do a training batch, take a step, and # the total_reward across all eps batch_presentations_count = total_steps = total_reward = 0 ########################################################################### for episode in range(num_episodes): # initialize task state = env.reset() if render: env.render() # Update epsilon once per episode - exp decaying epsilon -= (epsilon - final_epsilon) / epsilon_decay_steps # in test mode we set epsilon to 0 test_mode = force_test_mode or \ ((episode % test_frequency) < num_test_eps and episode > num_test_eps ) if test_mode: print("Test mode (epsilon set to 0.0)") ep_reward = 0 #######################################################################
# 只关心要保证好样本 for step in range(ep_max_steps): total_steps += 1 # get an action and take a step in the environment
       # 目前状态a,还是傻傻的网络会返回怎样的action
action = get_action(state, state_in, q_values, epsilon, test_mode, action_dim) # --> ################################## # ??? done. ################################## env_action = get_env_action(action) # 傻网络会带来怎样的状态b next_state, reward, done, _ = env.step(env_action) ep_reward += reward # display the updated environment if render: env.render() # comment this line to possibly reduce training time ################################## # !!! done. ################################## # add the s,a,r,s' samples to the replay_buffer
# 把这个愚蠢的结果保存
update_replay_buffer(replay_buffer, state, action, reward, next_state, done, action_dim) state = next_state # perform a training step if the replay_buffer has a batch worth of samples
# 只关心一批好样本足够后,就开始训练 if (len(replay_buffer) > BATCH_SIZE): ################################## # !!! done. ################################## do_train_step(replay_buffer, state_in, action_in, target_in, q_values, q_selected_action, loss, optimise_step, train_loss_summary_op, batch_presentations_count) batch_presentations_count += 1 if done: break total_reward += ep_reward test_or_train = "test" if test_mode else "train" print("end {0} episode {1}, ep reward: {2}, ave reward: {3}, \ Batch presentations: {4}, epsilon: {5}".format( test_or_train, episode, round(ep_reward,2), total_reward / (episode + 1), batch_presentations_count, epsilon ))

 

def get_action(state, state_in, q_values, epsilon, test_mode, action_dim):
Q_estimates
= q_values.eval(feed_dict={state_in: [state]})[0]
epsilon_to_use
= 0.0 if test_mode else epsilon
if random.random() < epsilon_to_use: action = random.randint(0, action_dim - 1) else: action = np.argmax(Q_estimates)  # 下步会有2种可能,选其中最大的作为action return action

replay_buffer里放着一堆好样本。

def do_train_step(replay_buffer, state_in, action_in, target_in, q_values, q_selected_action, loss, optimise_step, train_loss_summary_op, batch_presentations_count):
minibatch
= random.sample(replay_buffer, BATCH_SIZE) target_batch, state_batch, action_batch = get_train_batch(q_values, state_in, minibatch) # ----> summary, _ = session.run([train_loss_summary_op, optimise_step], feed_dict={ target_in: target_batch, state_in: state_batch, action_in: action_batch }) writer.add_summary(summary, batch_presentations_count)

 

def get_train_batch(q_values, state_in, minibatch): """ Generate Batch samples for training by sampling the replay buffer" Batches values are suggested to be the following; state_batch: Batch of state values action_batch: Batch of action values target_batch: Target batch for (s,a) pair i.e. one application of the bellman update rule. return: target_batch, state_batch, action_batch Hints: 1) To calculate the target batch values, you will need to use the q_values for the next_state for each entry in the batch. 2) The target value, combined with your loss defined in `get_network()` should reflect the equation in the middle of slide 12 of Deep RL 1 Lecture notes here: https://webcms3.cse.unsw.edu.au/COMP9444/17s2/resources/12494 """ state_batch = [data[0] for data in minibatch] action_batch = [data[1] for data in minibatch] reward_batch = [data[2] for data in minibatch] next_state_batch = [data[3] for data in minibatch] target_batch = [] Q_value_batch = q_values.eval(feed_dict={ state_in: next_state_batch }) for i in range(0, BATCH_SIZE): sample_is_done = minibatch[i][4] if sample_is_done: target_batch.append(reward_batch[i]) else: ##################################
            ##################################
            ##################################
            # TO IMPLEMENT: set the target_val to the correct Q value update
            target_val = reward_batch[i] + GAMMA * np.max(Q_value_batch[i]) target_batch.append(target_val)
return target_batch, state_batch, action_batch

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM