無人機輔助移動邊緣計算的計算卸載優化:一種深度確定性策略梯度方法(6)——代碼實現
參考連接:
[1] Wang Y , Fang W , Ding Y , et al. Computation offloading optimization for UAV-assisted mobile edge computing: a deep deterministic policy gradient approach[J]. Wireless Networks, 2021:1-16.doi:https://doi.org/10.1007/s11276-021-02632-z
https://morvanzhou.github.io/tutorials/
https://github.com/fangvv/UAV-DDPG/blob/main/DDPG/DDPG_without_behavior_noise/ddpg_algo.py
https://blog.csdn.net/weixin_43835470/article/details/120881273
6 代碼實現
本部分主要包括 Actor Critc,DDPG,DQN,Edge_only,Local_only 算法的基本實現思路,具體細節不予展示。
6.1 問題環境
問題環境主要包括了環境的各種狀態 s 、無人機 uav 相關參數、用戶 ue 相關參數和一些影響變量。其主要功能是模擬整個實驗環境。主要動作包括了無人機飛行、無人機運算、用戶移動和用戶本地運算。
6.1.1 參數設計
參數包括了場地大小,無人機位置,帶寬,噪聲功率等參數,包括模擬實驗中的所有參數。
#################### uav ####################
height = ground_length = ground_width = 100 # 場地長寬均為100m,UAV飛行高度也是
sum_task_size = 60 * 1048576 # 總計算任務60 Mbits
loc_uav = [50, 50] #無人機的初始位置
bandwidth_nums = 1
# 1MHz = 10^6Hz
B = bandwidth_nums * 10 ** 6 # 帶寬1MHz
p_noisy_los = 10 ** (-13) # 噪聲功率-100dBm ****
p_noisy_nlos = 10 ** (-11) # 噪聲功率-80dBm ****
flight_speed = 50. # 飛行速度50m/s
f_ue = 6e8 # UE的計算頻率0.6GHz
f_uav = 12e8 # UAV的計算頻率1.2GHz
r = 10 ** (-27) # 芯片結構對cpu處理的影響因子
s = 1000 # 單位bit處理所需cpu圈數1000
p_uplink = 0.1 # 上行鏈路傳輸功率0.1W
# alpha0 = -30 # 距離為1m時的參考信道增益-30dB
alpha0 = 1e-5 # 距離為1m時的參考信道增益-50dB = 1e-5 ****
T = 200 # 周期200s
delta_t = 5 # 1s飛行, 后4s用於懸停計算
slot_num = int(T / delta_t) # 40個間隔
m_uav = 9.65 # uav質量/kg
e_battery_uav = 500000 # uav電池電量: 500kJ. ref: Mobile Edge Computing via a UAV-Mounted Cloudlet: Optimization of Bit Allocation and Path Planning
#################### ues ####################
M = 4 # UE數量
block_flag_list = np.random.randint(0, 2, M) # 4個ue,ue的遮擋情況
loc_ue_list = np.random.randint(0, 101, size=[M, 2]) # 位置信息:x在0-100隨機
# task_list = np.random.randint(1048576, 2097153, M) # 隨機計算任務1~2Mbits
task_list = np.random.randint(1572864, 2097153, M) # 隨機計算任務1.5~2Mbits
# ue位置轉移概率
# 0:位置不變; 1:x+1,y; 2:x,y+1; 3:x-1,y; 4:x,y-1
# 60%概率原地不動,10%概率向上下左右
loc_ue_trans_pro = np.array([[.6, .1, .1, .1, .1],
[.6, .1, .1, .1, .1],
[.6, .1, .1, .1, .1],
[.6, .1, .1, .1, .1]])
action_bound = [-1, 1] # 對應tahn激活函數
action_dim = 4 # 第一位表示服務的ue id;中間兩位表示飛行角度和距離;后1位表示目前服務於UE的卸載率
state_dim = 4 + M * 4 # uav 剩余電量, uav 位置, 剩余總任務大小, 所有ue 位置, 所有ue 任務大小, 所有ue 遮擋情況
6.1.2 初始化部分
初始化包括 uav 剩余電量, uav 位置, 剩余總任務大小, 所有 ue 位置, 所有 ue 任務大小, 所有 ue 遮擋情況。
def __init__(self):
'''
初始化環境
包括uav 剩余電量, uav 位置, 剩余總任務大小, 所有ue 位置, 所有ue 任務大小, 所有ue 遮擋情況
Returns
-------
None.
'''
# uav battery remain, uav loc, remaining sum task size, all ue loc, all ue task size, all ue block_flag
self.start_state = np.append(self.e_battery_uav, self.loc_uav)
self.start_state = np.append(self.start_state, self.sum_task_size)
self.start_state = np.append(self.start_state, np.ravel(self.loc_ue_list))
self.start_state = np.append(self.start_state, self.task_list)
self.start_state = np.append(self.start_state, self.block_flag_list)
self.state = self.start_state
6.1.3 重置
由於實驗是划分時間段進行的,涉及到每個步驟執行后的重置。
def reset(self):
'''
重置環境
Returns
-------
所有的環境信息
'''
self.reset_env()
# uav battery remain, uav loc, remaining sum task size, all ue loc, all ue task size, all ue block_flag
self.state = np.append(self.e_battery_uav, self.loc_uav)
self.state = np.append(self.state, self.sum_task_size)
self.state = np.append(self.state, np.ravel(self.loc_ue_list))
self.state = np.append(self.state, self.task_list)
self.state = np.append(self.state, self.block_flag_list)
return self._get_obs()
def reset_env(self):
'''
將所有的值設置為默認值
Returns
-------
None.
'''
self.sum_task_size = 100 * 1048576 # 總計算任務60 Mbits
self.e_battery_uav = 500000 # uav電池電量: 500kJ
self.loc_uav = [50, 50]
self.loc_ue_list = np.random.randint(0, 101, size=[self.M, 2]) # 位置信息:x在0-100隨機
self.reset_step()
def reset_step(self):
'''
隨機初始化ue 的計算任務和遮擋情況
Returns
-------
None.
'''
self.task_list = np.random.randint(2621440, 3145729, self.M) # 隨機計算任務1.5~2Mbits -> 1.5~2 2~2.5 2.5~3 3~3.5 3.5~4
self.block_flag_list = np.random.randint(0, 2, self.M) # 4個ue,ue的遮擋情況
6.1.4 每段時間動作
在每個時間片段中進行的動作包括了選擇用戶,無人機飛行,計算任務,算出最大處理時延幾個部分。
選擇用戶的時候可以隨機選擇用戶,也可以類似DQN中根據對應的動作選擇用戶。
無人機飛行時需要計算飛行后的位置以及飛行時的能量消耗。
計算任務的時候需要考慮特殊情況,如計算任務已經完成,那么實驗立刻終止,不進行后續步驟;最后一步的計算任務總量與我們規定的總任務量不一致,那么重置任務大小使其滿足總計算任務量;無人機飛行時飛出場地,需要重新進行該步然后進行懲罰;以及無人機的電量不足以飛行或者計算,那么就將無人機所有電量都用完后,再進行下一片段。
計算最大處理時延后需要更新我們的獎懲,同時考慮用戶移動。
def step(self): # 0: 選擇服務的ue編號 ; 1: 方向theta; 2: 距離d; 3: offloading ratio
step_redo = False #是否重做該步
is_terminal = False #是否終止
ue_id = np.random.randint(0, self.M) #隨機選擇一個用戶
theta = 0 # 角度
offloading_ratio = 0 # ue卸載率
task_size = self.task_list[ue_id] #獲取該用戶的計算任務
block_flag = self.block_flag_list[ue_id] #獲取該用戶的遮擋情況
# 飛行距離
dis_fly = 0 # 1s飛行距離
# 飛行能耗
# delta_t:5s(1s飛行,4s計算) m_uav:uav質量
e_fly = (dis_fly / (self.delta_t * 0.5)) ** 2 * self.m_uav * (
self.delta_t * 0.5) * 0.5 # ref: Mobile Edge Computing via a UAV-Mounted Cloudlet: Optimization of Bit Allocation and Path Planning
# ref:通過無人機裝載的雲計算的移動邊緣計算:位分配和路徑規划的優化
# UAV飛行后的位置
dx_uav = dis_fly * math.cos(theta)
dy_uav = dis_fly * math.sin(theta)
loc_uav_after_fly_x = self.loc_uav[0] + dx_uav
loc_uav_after_fly_y = self.loc_uav[1] + dy_uav
# 服務器計算耗能
# f_uav:UAV的計算頻率 s:單位bit處理所需cpu圈數
t_server = offloading_ratio * task_size / (self.f_uav / self.s) # 在UAV邊緣服務器上計算時延
# r:芯片結構對cpu處理的影響因子
e_server = self.r * self.f_uav ** 3 * t_server # 在UAV邊緣服務器上計算耗能
# 計算任務全部完成
if self.sum_task_size == 0:
is_terminal = True
# file_name = 'output.txt'
# with open(file_name, 'a') as file_obj:
# file_obj.write("\n======== This episode is done ========") # 本episode結束
reward = 0
# 最后一步計算任務和ue的計算任務不匹配
elif self.sum_task_size - self.task_list[ue_id] < 0:
# 將最后一步計算任務改成總計算任務的剩余量
self.task_list = np.ones(self.M) * self.sum_task_size
reward = 0
step_redo = True
# uav位置不對
elif loc_uav_after_fly_x < 0 or loc_uav_after_fly_x > self.ground_width \
or loc_uav_after_fly_y < 0 or loc_uav_after_fly_y > self.ground_length:
reward = -100
step_redo = True
# uav電量不能支持飛行
elif self.e_battery_uav < e_fly:
reward = -100
# uav電量不能支持計算
elif self.e_battery_uav - e_fly < e_server:
reward = -100
# 電量支持飛行,且計算任務合理,且計算任務能在剩余電量內計算
else:
delay = self.com_delay(self.loc_ue_list[ue_id], np.array([loc_uav_after_fly_x, loc_uav_after_fly_y]),
offloading_ratio, task_size, block_flag) # 計算delay
reward = delay #reward設置為時延
# 更新下一時刻狀態
self.e_battery_uav = self.e_battery_uav - e_fly - e_server # uav 剩余電量
self.sum_task_size -= self.task_list[ue_id] # 剩余任務量
for i in range(self.M): # ue隨機移動
tmp = np.random.rand()
if 0.6 < tmp <= 0.7:
self.loc_ue_list[i] += [0, 1]
elif 0.7 < tmp <= 0.8:
self.loc_ue_list[i] += [1, 0]
elif 0.8 < tmp <= 0.9:
self.loc_ue_list[i] += [0, -1]
elif 0.9 < tmp <= 1:
self.loc_ue_list[i] += [-1, 0]
else:
self.loc_ue_list[i] += [0, 0]
# np.clip是一個截取函數,用於截取數組中小於或者大於某值的部分,並使得被截取部分等於固定值。
# np.clip(a, a_min, a_max, out=None):
# a:輸入矩陣;a_min:被限定的最小值,所有比a_min小的數都會強制變為a_min;
# a_max:被限定的最大值,所有比a_max大的數都會強制變為a_max;out:可以指定輸出矩陣的對象,shape與a相同
# 限定了用戶的位置
np.clip(self.loc_ue_list[i], 0, 100)
# self.task_list = np.random.randint(1048576, 2097153, self.M) # ue隨機計算任務1~2Mbits
# 隨機初始化ue 的計算任務和遮擋情況
self.reset_step()
return reward, is_terminal, step_redo
6.1.5 計算最大處理時延
根據用戶位置,無人機位置,ue卸載率,ue的計算任務,ue的遮擋情況這些參數來計算出最大處理時延。
參照下列公式:
def com_delay(self, loc_ue, loc_uav, offloading_ratio, task_size, block_flag):
'''
計算花費
Parameters
----------
loc_ue : 用戶位置
loc_uav : 無人機位置
offloading_ratio : ue卸載率
task_size : ue的計算任務
block_flag : ue的遮擋情況
Returns
-------
時延
'''
# 獲取uav與ue之間的距離
dx = loc_uav[0] - loc_ue[0]
dy = loc_uav[1] - loc_ue[1]
dh = self.height
dist_uav_ue = np.sqrt(dx * dx + dy * dy + dh * dh)
# 噪聲功率
p_noise = self.p_noisy_los
if block_flag == 1:
p_noise = self.p_noisy_nlos
g_uav_ue = abs(self.alpha0 / dist_uav_ue ** 2) # 信道增益
trans_rate = self.B * math.log2(1 + self.p_uplink * g_uav_ue / p_noise) # 上行鏈路傳輸速率bps
t_tr = offloading_ratio * task_size / trans_rate # 上傳時延,1B=8bit
t_edge_com = offloading_ratio * task_size / (self.f_uav / self.s) # 在UAV邊緣服務器上計算時延
t_local_com = (1 - offloading_ratio) * task_size / (self.f_ue / self.s) # 本地計算時延
# 比較上傳到服務器的時間和本地計算的時間,時延越長,返回的值越高
return max([t_tr + t_edge_com, t_local_com])
6.2 狀態標准化
使用狀態歸一化算法對觀測狀態進行預處理,從而更有效地訓練 DNN 。該算法將每個變量的最大值與最小值之差作為尺度因子。所提出的狀態歸一化算法可以很好地解決輸入變量的大小差異問題。
在我們的工作中,變量 \(E_{\text {battery }}(i), \mathbf{q}(i), \mathbf{p}_{1}(i), \ldots, \mathbf{p}_{K}(i), D_{\text {remain }}(i), D_{1}(i), \ldots, D_{K-1}(i)\) 和 \(D_{K-1}(i)\) 在狀態集中處於不同的序列,這可能導致在訓練中出現問題。如算法 1 所示,通過狀態歸一化對這些變量進行歸一化,以防止出現這種問題。在狀態歸一化算法中,我們使用了五個尺度因子。每個因素可以解釋如下。利用縮放因子 \(\gamma_b\) 來縮小無人機電池容量。由於 UAV 和 UE 具有相同的 x 和 y 坐標范圍,我們使用 \(\gamma_x\) 和 \(\gamma_y\) 分別縮小UAV和UE的x和y坐標。我們使用 \(\gamma_{D_{rm}}\) 來縮小整個時間段內剩余的任務,使用 \(\gamma_{D_{UE}}\) 來縮小時間段 i 內每個終端的任務大小。

import numpy as np
from UAV_env import UAVEnv
env = UAVEnv()
M = env.M
class StateNormalization(object):
'''
狀態標准化類
'''
def __init__(self):
self.high_state = np.array(
[5e5, env.ground_length, env.ground_width, 100 * 1048576])
self.high_state = np.append(self.high_state, np.ones(M * 2) * env.ground_length)
self.high_state = np.append(self.high_state, np.ones(M) * 3145728)
self.high_state = np.append(self.high_state, np.ones(M))
self.low_state = np.zeros(20) # uav loc, ue loc, task size, block_flag
self.low_state[len(self.low_state) - 2 * M:len(self.low_state) - M] = np.ones(M) * 2621440
def state_normal(self, state):
state[len(state) - 2 * M: len(state) - M] -= 2621440
res = state / (self.high_state - self.low_state)
return res
6.3 強化學習類
這里使用自己想要的強化學習類來完成對應的策略選擇。以DQN為例:
# Deep Q Network off-policy
class DeepQNetwork:
def __init__(
self,
n_actions,
n_features,
learning_rate=0.1,
reward_decay=0.001,
e_greedy=0.99,
replace_target_iter=200,
memory_size=MEMORY_CAPACITY,
batch_size=BATCH_SIZE,
# e_greedy_increment=8.684615e-05,
output_graph=False,
):
'''
初始化DQN網絡
Parameters
----------
n_actions : TYPE
行為空間大小
n_features : TYPE
環境特征
learning_rate : TYPE, optional
學習率. The default is 0.1.
reward_decay : TYPE, optional
獎勵衰減因子. The default is 0.001.
e_greedy : TYPE, optional
e貪心因子. The default is 0.99.
replace_target_iter : TYPE, optional
每多少次更新target參數. The default is 200.
memory_size : TYPE, optional
記憶庫大小. The default is MEMORY_CAPACITY.
batch_size : TYPE, optional
批處理大小. The default is BATCH_SIZE.
# e_greedy_increment : TYPE, optional
e貪心因子增長幅度. The default is 8.684615e-05.
output_graph : TYPE, optional
是否輸出圖表. The default is False.
Returns
-------
None.
'''
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon_max = e_greedy
self.replace_target_iter = replace_target_iter
self.memory_size = memory_size
self.batch_size = batch_size
# self.epsilon_increment = e_greedy_increment
# self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max
self.epsilon = 0.99
# total learning step
self.learn_step_counter = 0
# initialize zero memory [s, a, r, s_]
# 初始化記憶庫
self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + 2), dtype=np.float32) # memory里存放當前和下一個state,動作和獎勵
# consist of [target_net, evaluate_net]
# 構建神經網絡
self._build_net()
# tf.get_collection():從一個集合中取出變量
# tf.GraphKeys 包含所有graph collection中的標准集合名
# tf.GraphKeys.GLOBAL_VARIABLES 則應該是所有的圖變量的集合
t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='target_net')
e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='eval_net')
with tf.variable_scope('hard_replacement'):
self.target_replace_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]
self.sess = tf.Session()
if output_graph:
# $ tensorboard --logdir=logs
tf.summary.FileWriter("logs/", self.sess.graph)
# 全局變量初始化
self.sess.run(tf.global_variables_initializer())
self.cost_his = []
def _build_net(self):
'''
構建所有的網絡圖
Returns
-------
None.
'''
# ------------------ all inputs ------------------------
self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s') # input State
self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='s_') # input Next State
self.r = tf.placeholder(tf.float32, [None, ], name='r') # input Reward
self.a = tf.placeholder(tf.int32, [None, ], name='a') # input Action
# 滿足正態分布
w_initializer, b_initializer = tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)
# ------------------ build evaluate_net ------------------
with tf.variable_scope('eval_net'):
# tf.layers.dense():添加一個全連接層
# tf.nn.relu6:計算校正線性6:min(max(features, 0), 6)
e1 = tf.layers.dense(self.s, 100, tf.nn.relu6, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='e1')
# e2 = tf.layers.dense(e1, 48, tf.nn.relu6, kernel_initializer=w_initializer,
# bias_initializer=b_initializer, name='e2')
e3 = tf.layers.dense(e1, 20, tf.nn.relu, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='e3')
self.q_eval = tf.layers.dense(e3, self.n_actions, tf.nn.softmax, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='q')
# ------------------ build target_net ------------------
with tf.variable_scope('target_net'):
t1 = tf.layers.dense(self.s_, 100, tf.nn.relu6, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='t1')
# t2 = tf.layers.dense(t1, 48, tf.nn.relu6, kernel_initializer=w_initializer,
# bias_initializer=b_initializer, name='t2')
t3 = tf.layers.dense(t1, 20, tf.nn.relu, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='t3')
self.q_next = tf.layers.dense(t3, self.n_actions, tf.nn.softmax, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='t4')
with tf.variable_scope('q_target'):
q_target = self.r + self.gamma * tf.reduce_max(self.q_next, axis=1, name='Qmax_s_') # shape=(None, )
self.q_target = tf.stop_gradient(q_target)
with tf.variable_scope('q_eval'):
a_indices = tf.stack([tf.range(tf.shape(self.a)[0], dtype=tf.int32), self.a], axis=1)
self.q_eval_wrt_a = tf.gather_nd(params=self.q_eval, indices=a_indices) # shape=(None, )
with tf.variable_scope('loss'):
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval_wrt_a, name='TD_error'))
with tf.variable_scope('train'):
# self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
self._train_op = tf.train.AdamOptimizer(self.lr).minimize(self.loss)
def store_transition(self, s, a, r, s_):
'''
存儲記憶元組
Parameters
----------
s : 當前狀態
a : 動作
r : 獎懲
s_ : 下一狀態
Returns
-------
None.
'''
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0
# np.hstack將參數元組的元素數組按水平方向進行疊加
transition = np.hstack((s, a, [r], s_))
# replace the old memory with new memory
index = self.memory_counter % self.memory_size
self.memory[index, :] = transition
self.memory_counter += 1
def choose_action(self, observation):
'''
根據當前狀態選擇動作
Parameters
----------
observation : 對環境的觀測
Returns
-------
action : 做出的動作
'''
# to have batch dimension when feed into tf placeholder
# 環境狀態
observation = observation[np.newaxis, :]
# e貪心算法
if np.random.uniform() < self.epsilon:
# forward feed the observation and get q value for every actions
actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation})
action = np.argmax(actions_value)
else:
action = np.random.randint(0, self.n_actions)
return action
def learn(self):
'''
神經網絡訓練
Returns
-------
None.
'''
# check to replace target parameters
if self.learn_step_counter % self.replace_target_iter == 0:
self.sess.run(self.target_replace_op)
print('\ntarget_params_replaced\n')
# sample batch memory from all memory
# 從所有記憶中選取小批量記憶
if self.memory_counter > self.memory_size:
sample_index = np.random.choice(self.memory_size, size=self.batch_size)
else:
sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
batch_memory = self.memory[sample_index, :]
_, cost = self.sess.run(
[self._train_op, self.loss],
feed_dict={
self.s: batch_memory[:, :self.n_features],
self.a: batch_memory[:, self.n_features],
self.r: batch_memory[:, self.n_features + 1],
self.s_: batch_memory[:, -self.n_features:],
})
self.cost_his.append(cost)
# increasing epsilon
# self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
self.learn_step_counter += 1
