Deep Q-Network 學習筆記(二)—— Q-Learning與神經網絡結合使用(有代碼實現)


參考資料:

https://morvanzhou.github.io/

非常感謝莫煩老師的教程

 

http://mnemstudio.org/path-finding-q-learning-tutorial.htm

http://www.cnblogs.com/dragonir/p/6224313.html

這篇文章也是用非常簡單的說明將 Q-Learning 的過程給講解清楚了

 

http://www.cnblogs.com/jinxulin/tag/%E5%A2%9E%E5%BC%BA%E5%AD%A6%E4%B9%A0/

還有感謝這位園友,將增強學習的原理講解的非常清晰

 

深度增強學習(DRL)漫談 - 從DQN到AlphaGo

這篇文章詳細描寫了 DQN 的演變過程,建議先看看 

 

目錄:

Deep Q-Network 學習筆記(一)—— Q-Learning

Deep Q-Network 學習筆記(二)—— Q-Learning與神經網絡結合使用

 

這里將使用 tensorflow 框架重寫上一篇的示例。

一、思路

Q-Learning與神經網絡結合使用就是 Deep Q-Network,簡稱 DQN。在現實中,狀態的數量極多,並且需要人工去設計特征,而且一旦特征設計不好,則得不到想要的結果。

神經網絡正是能處理解決這個問題,取代原來 Q 表的功能。

當神經網絡與Q-Learning結合使用的時候,又會碰到幾個問題:

1.loss 要怎么計算?

增強學習是試錯學習(Trail-and-error),由於沒有直接的指導信息,智能體要以不斷與環境進行交互,通過試錯的方式來獲得最佳策略。

Q-Learning正是其中的一種,所以Q值表中表示的是當前已學習到的經驗。而根據公式計算出的 Q 值是智能體通過與環境交互及自身的經驗總結得到的一個分數(即:目標 Q 值)。

最后使用目標 Q 值(target_q)更新原來舊的 Q 值(q)

而目標 Q 值與舊的 Q 值的對應關系,正好是監督學習神經網絡結果值與輸出值的對應關系。

所以,loss = (target_q - q)^2

即:整個訓練過程其實就是 Q 值(q)目標 Q 值(target_q)逼近的過程。

2.訓練樣本哪來?

在 DQN 中有 Experience Replay 的概念,就是經驗回放

就是先讓智能體去探索環境,將經驗(記憶)池累積到一定程度,在隨機抽取出一批樣本進行訓練。

為什么要隨機抽取?因為智能體去探索環境時采集到的樣本是一個時間序列,樣本之間具有連續性,如果每次得到樣本就更新Q值,受樣本分布影響,會對收斂造成影響。

從現在開始,一定要理清楚算法的所有思路,比如什么時候該做什么,怎么隨機選擇動作,神經網絡的參數是否調試完成等等,各種問題調試都沒結果的,就因為這個卡在這里大半個星期才搞定。

二、模擬流程

1.隨機初始化一個狀態 s,初始化記憶池,設置觀察值。

2.循環遍歷(是永久遍歷還是只遍歷一定次數這個自己設置):

  (1)根據策略選擇一個行為(a)。

  (2)執行該行動(a),得到獎勵(r)、執行該行為后的狀態 s`和游戲是否結束 done。

  (3)保存 s, a, r, s`, done 到記憶池里。

  (4)判斷記憶池里的數據是否足夠(即:記憶池里的數據數量是否超過設置的觀察值),如果不夠,則轉到(5)步。

         ① 在記憶池里隨機抽取出一部分數據做為訓練樣本。

         ② 將所有訓練樣本的 s`做為神經網絡的輸入值,進行批量處理,得到 s`狀態下每個行為的 q 值的表。

         ③ 根據公式計算出 q 值表對應的 target_q 值表。

             公式:Q(s, a) = r + Gamma * Max[Q(s`, all actions)]

         ④ 使用 q 與 target_q 訓練神經網絡。

  (5)判斷游戲是否結束。

         ① 游戲結束,給 s 隨機設置一個狀態。

         ① 未結束,則當前狀態 s 更新為 s`。

三、代碼實現

首先,創建一個類來實現 DQN。

import tensorflow as tf
import numpy as np
from collections import deque
import random


class DeepQNetwork:
    r = np.array([[-1, -1, -1, -1, 0, -1],
                  [-1, -1, -1, 0, -1, 100.0],
                  [-1, -1, -1, 0, -1, -1],
                  [-1, 0, 0, -1, 0, -1],
                  [0, -1, -1, 1, -1, 100],
                  [-1, 0, -1, -1, 0, 100],
                  ])

    # 執行步數。
    step_index = 0

    # 狀態數。
    state_num = 6

    # 動作數。
    action_num = 6

    # 訓練之前觀察多少步。
    OBSERVE = 1000.

    # 選取的小批量訓練樣本數。
    BATCH = 20

    # epsilon 的最小值,當 epsilon 小於該值時,將不在隨機選擇行為。
    FINAL_EPSILON = 0.0001

    # epsilon 的初始值,epsilon 逐漸減小。
    INITIAL_EPSILON = 0.1

    # epsilon 衰減的總步數。
    EXPLORE = 3000000.

    # 探索模式計數。
    epsilon = 0

    # 訓練步數統計。
    learn_step_counter = 0

    # 學習率。
    learning_rate = 0.001

    # γ經驗折損率。
    gamma = 0.9

    # 記憶上限。
    memory_size = 5000

    # 當前記憶數。
    memory_counter = 0

    # 保存觀察到的執行過的行動的存儲器,即:曾經經歷過的記憶。
    replay_memory_store = deque()

    # 生成一個狀態矩陣(6 X 6),每一行代表一個狀態。
    state_list = None

    # 生成一個動作矩陣。
    action_list = None

    # q_eval 網絡。
    q_eval_input = None
    action_input = None
    q_target = None
    q_eval = None
    predict = None
    loss = None
    train_op = None
    cost_his = None
    reward_action = None

    # tensorflow 會話。
    session = None

    def __init__(self, learning_rate=0.001, gamma=0.9, memory_size=5000):
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.memory_size = memory_size

        # 初始化成一個 6 X 6 的狀態矩陣。
        self.state_list = np.identity(self.state_num)

        # 初始化成一個 6 X 6 的動作矩陣。
        self.action_list = np.identity(self.action_num)

        # 創建神經網絡。
        self.create_network()

        # 初始化 tensorflow 會話。
        self.session = tf.InteractiveSession()

        # 初始化 tensorflow 參數。
        self.session.run(tf.initialize_all_variables())

        # 記錄所有 loss 變化。
        self.cost_his = []

    def create_network(self):
        """
        創建神經網絡。
        :return:
        """
        pass

    def select_action(self, state_index):
        """
        根據策略選擇動作。
        :param state_index: 當前狀態。
        :return:
        """
        pass

    def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done):
        """
        保存記憶。
        :param current_state_index: 當前狀態 index。
        :param current_action_index: 動作 index。
        :param current_reward: 獎勵。
        :param next_state_index: 下一個狀態 index。
        :param done: 是否結束。
        :return:
        """
        pass

    def step(self, state, action):
        """
        執行動作。
        :param state: 當前狀態。
        :param action: 執行的動作。
        :return:
        """
        pass

    def experience_replay(self):
        """
        記憶回放。
        :return:
        """
        pass

    def train(self):
        """
        訓練。
        :return:
        """
        pass

    def pay(self):
        """
        運行並測試。
        :return:
        """
        pass


if __name__ == "__main__":
    q_network = DeepQNetwork()
    q_network.pay()

 

1.將狀態與動作初始化成以下矩陣,以方便處理。

            圖 3.1

 

 四、創建神經網絡

然后,創建一個神經網絡,並使用該神經網絡來替換掉 Q 值表(上一篇中的 Q 矩陣)

神經網絡的輸入是 Agent 當前的狀態,輸出是 Agent 當前狀態可以執行的動作的 Q 值表

由於總共有 6 個狀態6 種動作,所以,這里將創建一個簡單 3 層的神經網絡,輸入層的參數是 6 個輸出層輸出 6 個值,運行並調試好參數,確認能正常運行。

 測試代碼:

import tensorflow as tf
import numpy as np

input_num = 6
output_num = 6
x_data = np.linspace(-1, 1, 300).reshape((-1, input_num))  # 轉為列向量

noise = np.random.normal(0, 0.05, x_data.shape)
y_data = np.square(x_data) + 0.5 + noise

xs = tf.placeholder(tf.float32, [None, input_num])  # 樣本數未知,特征數為 6,占位符最后要以字典形式在運行中填入
ys = tf.placeholder(tf.float32, [None, output_num])

neuro_layer_1 = 3
w1 = tf.Variable(tf.random_normal([input_num, neuro_layer_1]))
b1 = tf.Variable(tf.zeros([1, neuro_layer_1]) + 0.1)
l1 = tf.nn.relu(tf.matmul(xs, w1) + b1)

neuro_layer_2 = output_num
w2 = tf.Variable(tf.random_normal([neuro_layer_1, neuro_layer_2]))
b2 = tf.Variable(tf.zeros([1, neuro_layer_2]) + 0.1)
l2 = tf.matmul(l1, w2) + b2

# reduction_indices=[0] 表示將列數據累加到一起。
# reduction_indices=[1] 表示將行數據累加到一起。
loss = tf.reduce_mean(tf.reduce_sum(tf.square((ys - l2)), reduction_indices=[1]))

# 選擇梯度下降法
train = tf.train.GradientDescentOptimizer(0.001).minimize(loss)
# train = tf.train.AdamOptimizer(1e-1).minimize(loss)

init = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init)

for i in range(100000):
    sess.run(train, feed_dict={xs: x_data, ys: y_data})
    if i % 1000 == 0:
        print(sess.run(loss, feed_dict={xs: x_data, ys: y_data}))

執行后 loss 一直持續減少,確認該神經網絡正常運行就行了,注意調好學習率和神經網絡的層數及神經元個數。

 

確認正常后,開始實現 DeepQNetwork 類中的 def create_network(self) 函數:

    def create_network(self):
        """
        創建神經網絡。
        :return:
        """
        self.q_eval_input = tf.placeholder(shape=[None, self.state_num], dtype=tf.float32)
        self.action_input = tf.placeholder(shape=[None, self.action_num], dtype=tf.float32)
        self.q_target = tf.placeholder(shape=[None], dtype=tf.float32)

        neuro_layer_1 = 3
        w1 = tf.Variable(tf.random_normal([self.state_num, neuro_layer_1]))
        b1 = tf.Variable(tf.zeros([1, neuro_layer_1]) + 0.1)
        l1 = tf.nn.relu(tf.matmul(self.q_eval_input, w1) + b1)

        w2 = tf.Variable(tf.random_normal([neuro_layer_1, self.action_num]))
        b2 = tf.Variable(tf.zeros([1, self.action_num]) + 0.1)
        self.q_eval = tf.matmul(l1, w2) + b2

        # 取出當前動作的得分。
        self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval, self.action_input), reduction_indices=1)
        self.loss = tf.reduce_mean(tf.square((self.q_target - self.reward_action)))
        self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss)

        self.predict = tf.argmax(self.q_eval, 1)

這里說明一下 loss 的計算,由於狀態是根據圖 3.1 的矩陣的方式顯示的,比如,當前狀態如果是在 1 號房間,

輸入參數(q_eval_input)的值是:[[0, 1, 0, 0, 0, 0]]

由於 Agent 執行了動作 3,也就是移動到了 3 號房間,

所以 Agent動作參數(action_input)的值是:[[0, 0, 0, 1, 0, 0]]

因為神經網絡的輸出結果(q_eval)是 Agent 當前狀態下可執行的動作的價值,由於每個狀態都有 6 個動作,而狀態數也是 6 個,所以

神經網絡的輸出結果(q_eval)輸入參數是一樣的,所以輸出的格式也一樣,

假設輸出結果(q_eval)是:[[0.81, 0.5, 0.24, 0.513, 0.9, 0.71]]

tf.multiply(self.q_eval, self.action_input)

就是矩陣的點積,也就是每個元素分別相乘。

這里表示的就是獲得 Agent 執行了 action_input價值(Q 值)

也就是 q = q_eval * action_input = [[0, 0, 0, 0.513, 0, 0]]

所以:

self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval, self.action_input), reduction_indices=1)

也就相當於:q = SUM(q_eval * action_input) = SUM([[0, 0, 0, 0.513, 0, 0, 0]]) = 0.513

即:Agent 1 號房間執行移動到 3 號房間動作時,神經網絡給出的價值(q 值) 0.513 

而 q_target 在前面也提過,是 Agent 經過環境體驗后,根據公式計算出來的目標 q 值,假設該值是 1.03

所以:

self.loss = tf.reduce_mean(tf.square((self.q_target - self.reward_action)))

就相當於:

loss = ((1.03 - 0.513)^2) / 1 = 0.267289

然后,learning_rate 也就是學習率,經過調試,這里是設置成 0.001 比較好。

五、搜索動作策略

這里是 DQN 需要注意的地方之一,這里的方法將直接影響到 DQN 是否可以收斂,或者是否是陷入局部最小值等情況。

現在在這里選擇了最直接的方法,使用隨機的方式來選擇行動。

使用隨機的方式來選擇行動,可以讓 Agent 能得到更多的探索機會,這樣在訓練時才能有效的跳出陷入局部最小值的情況,當訓練時,可以減少探索機會。

流程如下:

    1.初始化 epsilon 變量,並設置它的最小值(FINAL_EPSILON)最大值(INITIAL_EPSILON),並將 epsilon初始值設置成 INITIAL_EPSILON

    2.隨機生成一個數 n。

    3.判斷 n 是否小於 epsilon,如果 n 小於 epsilon 則轉到 4否則轉到 5

    4.使用隨機策略(增加探索機會),轉到 6。

       隨機選擇一個在 Agent 當前狀態下可以執行的動作

    5.使用神經網絡直接計算出結果(實際應用時也是應用這方法),轉到6。

       神經網絡會輸出在當前狀態下所有動作的 Q 值,選擇其中最有價值(Q 值最大)的動作返回。

    6.判斷是否開始訓練,如果是,則逐步減少 epsilon 來減少探索機會,否則跳過。

 

開始實現 DeepQNetwork 類中的 def select_action(self, state_index) 函數:

    def select_action(self, state_index):
        """
        根據策略選擇動作。
        :param state_index: 當前狀態。
        :return:
        """
        current_state = self.state_list[state_index:state_index + 1]

        if np.random.uniform() < self.epsilon:
            current_action_index = np.random.randint(0, self.action_num)
        else:
            actions_value = self.session.run(self.q_eval, feed_dict={self.q_eval_input: current_state})
            action = np.argmax(actions_value)
            current_action_index = action

        # 開始訓練后,在 epsilon 小於一定的值之前,將逐步減小 epsilon。
        if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON:
            self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE

        return current_action_index

 

六、保存記憶

 這里使用了一個先進先出的隊列,設置好隊列的 size,直接將“當前狀態”、“執行動作”、“獎勵分數”、“下一個狀態”和“游戲是否結束”保存進去就行了。

 開始實現 DeepQNetwork 類中的 def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done) 函數:

    def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done):
        """
        保存記憶。
        :param current_state_index: 當前狀態 index。
        :param current_action_index: 動作 index。
        :param current_reward: 獎勵。
        :param next_state_index: 下一個狀態 index。
        :param done: 是否結束。
        :return:
        """
        current_state = self.state_list[current_state_index:current_state_index + 1]
        current_action = self.action_list[current_action_index:current_action_index + 1]
        next_state = self.state_list[next_state_index:next_state_index + 1]
        # 記憶動作(當前狀態, 當前執行的動作, 當前動作的得分,下一個狀態)。
        self.replay_memory_store.append((
            current_state,
            current_action,
            current_reward,
            next_state,
            done))

        # 如果超過記憶的容量,則將最久遠的記憶移除。
        if len(self.replay_memory_store) > self.memory_size:
            self.replay_memory_store.popleft()

        self.memory_counter += 1

 

七、執行動作

這里就是取得游戲是否結束狀態,動作獎勵和下一個狀態並返回就可以了。

開始實現 DeepQNetwork 類中的 def step(self, state, action) 函數:

    def step(self, state, action):
        """
        執行動作。
        :param state: 當前狀態。
        :param action: 執行的動作。
        :return:
        """
        reward = self.r[state][action]

        next_state = action

        done = False

        if action == 5:
            done = True

        return next_state, reward, done

 

八、記憶回放

 這是 DQN 的重點之一,在記憶池里隨機抽取出一小批的數據當做訓練樣本,並計算出目標 Q 值來訓練神經網絡。

流程如下:

    1. 初始化時先設置抽取的樣本數。

    2. 從記憶池里隨機抽取出一批樣本。

    3. 由於每條樣本中,都保存有當時的數據(當前狀態,動作,獎勵分數,下一個狀態,是否結束),所以,為了計算出這些樣本數據的目標 Q 值,就必須先取出樣本中“下一個狀態(next_state)”(注意:這里取到的是所有這批樣本的“下一個狀態”的列表!)。

    4. 將 next_state (這是批數據!!)當做參數傳入神經網絡,得到 Agent 在 next_state 狀態時所有可執行的動作的 Q 值表(q_next),q_next 表示這批樣本中所有的 next_state 狀態的 Q 值表的集合

    5. 現在,已經拿到了

        Agent 當時的狀態(state),

        當時的動作(action),

        當時的狀態(state)下執行動作(action)得到的獎勵R(state, action),

        當時的狀態(state)下執行動作(action)后的狀態(next_state)下所有可執行的動作的 Q 值表(q_next)

        現在就可以使用上面提到的公式來計算出目標 Q 值Q(state, action)。

        Q(state, action) = R(state, action) + Gamma * Max{q_next}

    6. 根據游戲狀態判斷,當前選擇的動作是否是違規(不可執行)的動作,如果,則不做經驗計算,直接扣除分數,否則使用上面的公式來計算出Q(state, action)違規的動作,在搜索動作的時候,是不應該能選擇出來的,即:能走到這一步的都必須是有效動作

    7. 將計算得到的所有樣本的 Q(state, action) 保存到集合中(q_target)。

    8. 將這批樣本當前狀態的集合,動作的集合與 q_target 傳入神經網絡並進行訓練

 特別注意第 6 條的內容,如果這里處理不好,一樣會得不到結果的,具體原因可以看上一篇

 

開始實現 DeepQNetwork 類中的 def experience_replay(self)函數:

    def experience_replay(self):
        """
        記憶回放。
        :return:
        """
        # 隨機選擇一小批記憶樣本。
        batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter
        minibatch = random.sample(self.replay_memory_store, batch)

        batch_state = None
        batch_action = None
        batch_reward = None
        batch_next_state = None
        batch_done = None

        for index in range(len(minibatch)):
            if batch_state is None:
                batch_state = minibatch[index][0]
            elif batch_state is not None:
                batch_state = np.vstack((batch_state, minibatch[index][0]))

            if batch_action is None:
                batch_action = minibatch[index][1]
            elif batch_action is not None:
                batch_action = np.vstack((batch_action, minibatch[index][1]))

            if batch_reward is None:
                batch_reward = minibatch[index][2]
            elif batch_reward is not None:
                batch_reward = np.vstack((batch_reward, minibatch[index][2]))

            if batch_next_state is None:
                batch_next_state = minibatch[index][3]
            elif batch_next_state is not None:
                batch_next_state = np.vstack((batch_next_state, minibatch[index][3]))

            if batch_done is None:
                batch_done = minibatch[index][4]
            elif batch_done is not None:
                batch_done = np.vstack((batch_done, minibatch[index][4]))

        # q_next:下一個狀態的 Q 值。
        q_next = self.session.run([self.q_eval], feed_dict={self.q_eval_input: batch_next_state})

        q_target = []
        for i in range(len(minibatch)):
            # 當前即時得分。
            current_reward = batch_reward[i][0]

            # # 游戲是否結束。
            # current_done = batch_done[i][0]

            # 更新 Q 值。
            q_value = current_reward + self.gamma * np.max(q_next[0][i])

            # 當得分小於 -1 時,表示走了不可走的位置。
            if current_reward <= -1:
                q_target.append(current_reward)
            else:
                q_target.append(q_value)

        _, cost, reward = self.session.run([self.train_op, self.loss, self.reward_action],
                                           feed_dict={self.q_eval_input: batch_state,
                                                      self.action_input: batch_action,
                                                      self.q_target: q_target})

        self.cost_his.append(cost)

        # if self.step_index % 1000 == 0:
        #     print("loss:", cost)

        self.learn_step_counter += 1

 

九、訓練

 這部分在前面都分析過了,看看就行了。

 實現 DeepQNetwork 類中的 def train(self)函數:

    def train(self):
        """
        訓練。
        :return:
        """
        # 初始化當前狀態。
        current_state = np.random.randint(0, self.action_num - 1)
        self.epsilon = self.INITIAL_EPSILON

        while True:
            # 選擇動作。
            action = self.select_action(current_state)

            # 執行動作,得到:下一個狀態,執行動作的得分,是否結束。
            next_state, reward, done = self.step(current_state, action)

            # 保存記憶。
            self.save_store(current_state, action, reward, next_state, done)

            # 先觀察一段時間累積足夠的記憶在進行訓練。
            if self.step_index > self.OBSERVE:
                self.experience_replay()

            if self.step_index > 10000:
                break

            if done:
                current_state = np.random.randint(0, self.action_num - 1)
            else:
                current_state = next_state

            self.step_index += 1

 

十、執行並測試訓練結果

 

 實現 DeepQNetwork 類中的 def pay(self)函數:

    def pay(self):
        """
        運行並測試。
        :return:
        """
        self.train()

        # 顯示 R 矩陣。
        print(self.r)

        for index in range(5):

            start_room = index

            print("#############################", "Agent 在", start_room, "開始行動", "#############################")

            current_state = start_room

            step = 0

            target_state = 5

            while current_state != target_state:
                out_result = self.session.run(self.q_eval, feed_dict={
                    self.q_eval_input: self.state_list[current_state:current_state + 1]})

                next_state = np.argmax(out_result[0])

                print("Agent 由", current_state, "號房間移動到了", next_state, "號房間")

                current_state = next_state

                step += 1

            print("Agent 在", start_room, "號房間開始移動了", step, "步到達了目標房間 5")

            print("#############################", "Agent 在", 5, "結束行動", "#############################")

 

十一、完整源碼

 

import tensorflow as tf
import numpy as np
from collections import deque
import random


class DeepQNetwork:
    r = np.array([[-1, -1, -1, -1, 0, -1],
                  [-1, -1, -1, 0, -1, 100.0],
                  [-1, -1, -1, 0, -1, -1],
                  [-1, 0, 0, -1, 0, -1],
                  [0, -1, -1, 1, -1, 100],
                  [-1, 0, -1, -1, 0, 100],
                  ])

    # 執行步數。
    step_index = 0

    # 狀態數。
    state_num = 6

    # 動作數。
    action_num = 6

    # 訓練之前觀察多少步。
    OBSERVE = 1000.

    # 選取的小批量訓練樣本數。
    BATCH = 20

    # epsilon 的最小值,當 epsilon 小於該值時,將不在隨機選擇行為。
    FINAL_EPSILON = 0.0001

    # epsilon 的初始值,epsilon 逐漸減小。
    INITIAL_EPSILON = 0.1

    # epsilon 衰減的總步數。
    EXPLORE = 3000000.

    # 探索模式計數。
    epsilon = 0

    # 訓練步數統計。
    learn_step_counter = 0

    # 學習率。
    learning_rate = 0.001

    # γ經驗折損率。
    gamma = 0.9

    # 記憶上限。
    memory_size = 5000

    # 當前記憶數。
    memory_counter = 0

    # 保存觀察到的執行過的行動的存儲器,即:曾經經歷過的記憶。
    replay_memory_store = deque()

    # 生成一個狀態矩陣(6 X 6),每一行代表一個狀態。
    state_list = None

    # 生成一個動作矩陣。
    action_list = None

    # q_eval 網絡。
    q_eval_input = None
    action_input = None
    q_target = None
    q_eval = None
    predict = None
    loss = None
    train_op = None
    cost_his = None
    reward_action = None

    # tensorflow 會話。
    session = None

    def __init__(self, learning_rate=0.001, gamma=0.9, memory_size=5000):
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.memory_size = memory_size

        # 初始化成一個 6 X 6 的狀態矩陣。
        self.state_list = np.identity(self.state_num)

        # 初始化成一個 6 X 6 的動作矩陣。
        self.action_list = np.identity(self.action_num)

        # 創建神經網絡。
        self.create_network()

        # 初始化 tensorflow 會話。
        self.session = tf.InteractiveSession()

        # 初始化 tensorflow 參數。
        self.session.run(tf.initialize_all_variables())

        # 記錄所有 loss 變化。
        self.cost_his = []

    def create_network(self):
        """
        創建神經網絡。
        :return:
        """
        self.q_eval_input = tf.placeholder(shape=[None, self.state_num], dtype=tf.float32)
        self.action_input = tf.placeholder(shape=[None, self.action_num], dtype=tf.float32)
        self.q_target = tf.placeholder(shape=[None], dtype=tf.float32)

        neuro_layer_1 = 3
        w1 = tf.Variable(tf.random_normal([self.state_num, neuro_layer_1]))
        b1 = tf.Variable(tf.zeros([1, neuro_layer_1]) + 0.1)
        l1 = tf.nn.relu(tf.matmul(self.q_eval_input, w1) + b1)

        w2 = tf.Variable(tf.random_normal([neuro_layer_1, self.action_num]))
        b2 = tf.Variable(tf.zeros([1, self.action_num]) + 0.1)
        self.q_eval = tf.matmul(l1, w2) + b2

        # 取出當前動作的得分。
        self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval, self.action_input), reduction_indices=1)
        self.loss = tf.reduce_mean(tf.square((self.q_target - self.reward_action)))
        self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss)

        self.predict = tf.argmax(self.q_eval, 1)

    def select_action(self, state_index):
        """
        根據策略選擇動作。
        :param state_index: 當前狀態。
        :return:
        """
        current_state = self.state_list[state_index:state_index + 1]

        if np.random.uniform() < self.epsilon:
            current_action_index = np.random.randint(0, self.action_num)
        else:
            actions_value = self.session.run(self.q_eval, feed_dict={self.q_eval_input: current_state})
            action = np.argmax(actions_value)
            current_action_index = action

        # 開始訓練后,在 epsilon 小於一定的值之前,將逐步減小 epsilon。
        if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON:
            self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE

        return current_action_index

    def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done):
        """
        保存記憶。
        :param current_state_index: 當前狀態 index。
        :param current_action_index: 動作 index。
        :param current_reward: 獎勵。
        :param next_state_index: 下一個狀態 index。
        :param done: 是否結束。
        :return:
        """
        current_state = self.state_list[current_state_index:current_state_index + 1]
        current_action = self.action_list[current_action_index:current_action_index + 1]
        next_state = self.state_list[next_state_index:next_state_index + 1]
        # 記憶動作(當前狀態, 當前執行的動作, 當前動作的得分,下一個狀態)。
        self.replay_memory_store.append((
            current_state,
            current_action,
            current_reward,
            next_state,
            done))

        # 如果超過記憶的容量,則將最久遠的記憶移除。
        if len(self.replay_memory_store) > self.memory_size:
            self.replay_memory_store.popleft()

        self.memory_counter += 1

    def step(self, state, action):
        """
        執行動作。
        :param state: 當前狀態。
        :param action: 執行的動作。
        :return:
        """
        reward = self.r[state][action]

        next_state = action

        done = False

        if action == 5:
            done = True

        return next_state, reward, done

    def experience_replay(self):
        """
        記憶回放。
        :return:
        """
        # 隨機選擇一小批記憶樣本。
        batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter
        minibatch = random.sample(self.replay_memory_store, batch)

        batch_state = None
        batch_action = None
        batch_reward = None
        batch_next_state = None
        batch_done = None

        for index in range(len(minibatch)):
            if batch_state is None:
                batch_state = minibatch[index][0]
            elif batch_state is not None:
                batch_state = np.vstack((batch_state, minibatch[index][0]))

            if batch_action is None:
                batch_action = minibatch[index][1]
            elif batch_action is not None:
                batch_action = np.vstack((batch_action, minibatch[index][1]))

            if batch_reward is None:
                batch_reward = minibatch[index][2]
            elif batch_reward is not None:
                batch_reward = np.vstack((batch_reward, minibatch[index][2]))

            if batch_next_state is None:
                batch_next_state = minibatch[index][3]
            elif batch_next_state is not None:
                batch_next_state = np.vstack((batch_next_state, minibatch[index][3]))

            if batch_done is None:
                batch_done = minibatch[index][4]
            elif batch_done is not None:
                batch_done = np.vstack((batch_done, minibatch[index][4]))

        # q_next:下一個狀態的 Q 值。
        q_next = self.session.run([self.q_eval], feed_dict={self.q_eval_input: batch_next_state})

        q_target = []
        for i in range(len(minibatch)):
            # 當前即時得分。
            current_reward = batch_reward[i][0]

            # # 游戲是否結束。
            # current_done = batch_done[i][0]

            # 更新 Q 值。
            q_value = current_reward + self.gamma * np.max(q_next[0][i])

            # 當得分小於 -1 時,表示走了不可走的位置。
            if current_reward <= -1:
                q_target.append(current_reward)
            else:
                q_target.append(q_value)

        _, cost, reward = self.session.run([self.train_op, self.loss, self.reward_action],
                                           feed_dict={self.q_eval_input: batch_state,
                                                      self.action_input: batch_action,
                                                      self.q_target: q_target})

        self.cost_his.append(cost)

        # if self.step_index % 1000 == 0:
        #     print("loss:", cost)

        self.learn_step_counter += 1

    def train(self):
        """
        訓練。
        :return:
        """
        # 初始化當前狀態。
        current_state = np.random.randint(0, self.action_num - 1)
        self.epsilon = self.INITIAL_EPSILON

        while True:
            # 選擇動作。
            action = self.select_action(current_state)

            # 執行動作,得到:下一個狀態,執行動作的得分,是否結束。
            next_state, reward, done = self.step(current_state, action)

            # 保存記憶。
            self.save_store(current_state, action, reward, next_state, done)

            # 先觀察一段時間累積足夠的記憶在進行訓練。
            if self.step_index > self.OBSERVE:
                self.experience_replay()

            if self.step_index > 10000:
                break

            if done:
                current_state = np.random.randint(0, self.action_num - 1)
            else:
                current_state = next_state

            self.step_index += 1

    def pay(self):
        """
        運行並測試。
        :return:
        """
        self.train()

        # 顯示 R 矩陣。
        print(self.r)

        for index in range(5):

            start_room = index

            print("#############################", "Agent 在", start_room, "開始行動", "#############################")

            current_state = start_room

            step = 0

            target_state = 5

            while current_state != target_state:
                out_result = self.session.run(self.q_eval, feed_dict={
                    self.q_eval_input: self.state_list[current_state:current_state + 1]})

                next_state = np.argmax(out_result[0])

                print("Agent 由", current_state, "號房間移動到了", next_state, "號房間")

                current_state = next_state

                step += 1

            print("Agent 在", start_room, "號房間開始移動了", step, "步到達了目標房間 5")

            print("#############################", "Agent 在", 5, "結束行動", "#############################")


if __name__ == "__main__":
    q_network = DeepQNetwork()
    q_network.pay()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM