蟻群算法是一種啟發式優化算法,也是一種智能算法、進化計算。和遺傳算法、粒子群算法相比,蟻群算法所優化的內容是拓撲序(或者路徑)的信息素濃度,而遺傳算法、粒子群算法優化的是某一個個體(解向量)。
例如TSP問題,30個城市之間有900個對應關系,30*15/2=435條路徑,在螞蟻經過之后,都留下了信息素,而某一個拓撲序指的是一個解向量,旅行商的回路應是首尾相連的30條路徑,螞蟻在走路的時候會考慮到能走的路徑上的信息素濃度,然后選擇一個拓撲序作為新解。例如任務車間調度問題,假如有8個機器和10個任務,則一共有80個對應關系,每個對應關系看作一個路徑,會有信息素的標記,而解向量是一個長度為10的向量,螞蟻在走路的時候,會選擇每一個任務到所有機器的對應關系,考慮信息素的濃度選擇其中一個,由此選擇10個任務各自放在哪一個機器上,作為新解。
蟻群算法天生適合解決路徑問題、指派問題,效果通常比粒子群等要好。相比於模擬退火算法,計算更穩定。相比於粒子群算法收斂性更好。
對於任務調度問題,需要定義模型對象:
節點和雲任務。節點VM具有節點編號id、兩種資源總量和兩種資源的已占有量。因此其剩余空間是總量capacity減去已占有量supply。
class Cloudlet: def __init__(self, cpu_demand: float, mem_demand: float): self.cpu_demand = cpu_demand self.mem_demand = mem_demand class VM: def __init__(self, vm_id: int, cpu_supply: float, cpu_velocity: float, mem_supply: float, mem_capacity: float): self.id = vm_id self.cpu_supply = cpu_supply self.cpu_velocity = cpu_velocity self.mem_supply = mem_supply self.mem_capacity = mem_capacity
然后定義蟻群算法解決任務調度問題的類。
初始化屬性有雲任務列表,節點列表,螞蟻種群數量,迭代次數,以及人物的路徑數和最優策略記錄。
函數有生成新解的函數gen_topo_jobs(),評估解的負載均衡值和計算適應度evaluate_particle、calculate_fitness,更新信息素update_topo以及蟻群算法主流程schedulet_main。后面逐一講解
import numpy as np from typing import List from matplotlib import pyplot as plt class ACScheduler: def __init__(self, cloudlets, vms, population_number=100, times=500): self.cloudlets = cloudlets self.vms = vms self.cloudlet_num = len(cloudlets) # 任務數量也就是粒子長度 self.machine_number = len(vms) # 機器數量 self.population_number = population_number # 種群數量 self.times = times # 迭代代數 # 表示任務選擇的機器的信息素 self.topo_phs = [[100 for _ in range(self.machine_number)] for _ in range(self.cloudlet_num)] # 最優策略 self.best_topo = None # 生成新的解向量--根據信息素濃度生成 def gen_topo_jobs(self): pass # 與算法無關的函數--評估當前解向量 def evaluate_particle(self, topo_jobs: List[int]) -> int: pass # 與算法無關的函數--計算適應度 def calculate_fitness(self, topo_jobs: List[int]) -> float: pass # 更新信息素 def update_topo(self): pass def scheduler_main(self): pass if __name__ == '__main__': nodes = [VM(0, 0.862, 950, 950, 1719), VM(1, 0.962, 2, 950, 1719), VM(2, 1.062, 2, 1500, 1719)] lets = [Cloudlet(0.15, 50), Cloudlet(0.05, 100), Cloudlet(0.2, 60), Cloudlet(0.01, 70), Cloudlet(0.04, 80), Cloudlet(0.07, 20), Cloudlet(0.14, 150), Cloudlet(0.15, 200), Cloudlet(0.03, 40), Cloudlet(0.06, 90)] ac = ACScheduler(lets, nodes, times=150) res = ac.scheduler_main() i = 0 for _ in ac.best_topo: print("任務:", i, " 放置到機器", ac.best_topo[i], "上執行") i += 1
1. 生成新的解向量gen_topo_jobs(self)
初始化解向量為-1,然后逐個根據信息素的濃度選擇任務的對應的機器,以此生成新的解向量返回
# 生成新的解向量--根據信息素濃度生成 def gen_topo_jobs(self): ans = [-1 for _ in range(self.cloudlet_num)] node_free = [node_id for node_id in range(self.machine_number)] for let in range(self.cloudlet_num): ph_sum = np.sum(list(map(lambda j: self.topo_phs[let][j], node_free))) test_val = 0 rand_ph = np.random.uniform(0, ph_sum) for node_id in node_free: test_val += self.topo_phs[let][node_id] if rand_ph <= test_val: ans[let] = node_id break return ans
2. 評估解向量然后計算適應度
把解向量的策略利用到集群中,然后計算每個節點每個資源的利用率,然后對所有節點取標准差,各個資源的利用率標准差加和作為負載均衡度,越小越均衡。
由於適應度定義為越大越好,因此評估結果取倒數作為適應度。
# 與算法無關的函數--評估當前解向量 def evaluate_particle(self, topo_jobs: List[int]) -> int: cpu_util = np.zeros(self.machine_number) mem_util = np.zeros(self.machine_number) for i in range(len(self.vms)): cpu_util[i] = self.vms[i].cpu_supply mem_util[i] = self.vms[i].mem_supply for i in range(self.cloudlet_num): cpu_util[topo_jobs[i]] += self.cloudlets[i].cpu_demand mem_util[topo_jobs[i]] += self.cloudlets[i].mem_demand for i in range(self.machine_number): if cpu_util[i] > self.vms[i].cpu_velocity: return 100 if mem_util[i] > self.vms[i].mem_capacity: return 100 for i in range(self.machine_number): cpu_util[i] /= self.vms[i].cpu_velocity mem_util[i] /= self.vms[i].mem_capacity return np.std(cpu_util, ddof=1) + np.std(mem_util, ddof=1) # 與算法無關的函數--計算適應度 def calculate_fitness(self, topo_jobs: List[int]) -> float: return 1 / self.evaluate_particle(topo_jobs)
3. 更新信息素:有螞蟻經過的路徑信息素加倍,否則揮發減半
# 更新信息素 def update_topo(self): for i in range(self.cloudlet_num): for j in range(self.machine_number): if j == self.best_topo[i]: self.topo_phs[i][j] *= 2 else: self.topo_phs[i][j] *= 0.5
4. 主流程:迭代求解
def scheduler_main(self): results = [0 for _ in range(self.times)] fitness = 0 for it in range(self.times): best_time = 0 for ant_id in range(self.population_number): topo_jobs = self.gen_topo_jobs() fitness = self.calculate_fitness(topo_jobs) if fitness > best_time: self.best_topo = topo_jobs best_time = fitness assert self.best_topo is not None self.update_topo() results[it] = best_time if it % 10 == 0: print("ACO iter: ", it, " / ", self.times, ", 適應度: ", fitness) plt.plot(range(self.times), results) plt.xlabel("迭代次數") plt.ylabel("適應度") plt.rcParams['font.sans-serif'] = ['KaiTi'] # 指定默認字體 plt.rcParams['axes.unicode_minus'] = False # 解決保存圖像是負號'-'顯示為方塊的問題 plt.title("蟻群算法求解雲任務調度負載均衡問題") # plt.savefig('img/ACScheduler-1.1_0.9-popu100-iter200.png', dpi=300, # format='png') # bbox_inches="tight"解決X軸時間兩個字不被保存的問題 plt.show() return results
測試數據:
if __name__ == '__main__': # 第四組數據data9 nodes = [ VM(0, 0.662, 2, 620, 2223), VM(1, 0.662, 2, 1100, 2223), VM(2, 1.662, 2, 720, 2223), VM(3, 0.662, 2, 1100, 2223), VM(4, 0.662, 2, 620, 2223), VM(5, 0.562, 2, 650, 2223), VM(6, 0.562, 2, 620, 2223), VM(7, 0.462, 2, 440, 2223), # 8 ] lets = [ Cloudlet(0.133364, 272.435810), Cloudlet(0.226357, 141.126392), Cloudlet(0.084122, 7.183883), Cloudlet(0.029290, 96.658838), Cloudlet(0.027560, 247.821058), Cloudlet(0.191912, 80.636804), Cloudlet(0.134658, 220.702279), Cloudlet(0.133052, 163.046071), Cloudlet(0.272010, 253.477271), Cloudlet(0.175000, 19.409176), Cloudlet(0.166933, 140.880123), Cloudlet(0.286495, 71.288800), Cloudlet(0.080714, 354.839232), Cloudlet(0.209842, 211.351191), Cloudlet(0.221753, 249.500490), Cloudlet(0.128952, 81.599575), Cloudlet(0.168469, 122.216016), Cloudlet(0.049628, 135.728968), Cloudlet(0.051167, 230.172949), Cloudlet(0.158938, 135.356776), Cloudlet(0.212047, 202.830773), Cloudlet(0.372328, 13.145747), Cloudlet(0.092549, 130.122476), Cloudlet(0.166031, 97.761267), Cloudlet(0.142820, 45.852985), Cloudlet(0.016367, 189.495519), Cloudlet(0.112156, 173.926518), Cloudlet(0.004466, 156.806505), Cloudlet(0.222208, 62.619918), Cloudlet(0.073526, 232.175486), Cloudlet(0.158527, 178.624649), Cloudlet(0.103075, 133.896667), Cloudlet(0.176026, 156.076929), Cloudlet(0.098275, 136.450544), Cloudlet(0.192065, 33.923922), Cloudlet(0.213519, 134.820860), ] ac = ACScheduler(lets, nodes, times=150) res = ac.scheduler_main() i = 0 for _ in ac.best_topo: print("任務:", i, " 放置到機器", ac.best_topo[i], "上執行") i += 1
求解結果:
ACO iter: 0 / 150 , 適應度: 0.01
ACO iter: 10 / 150 , 適應度: 4.0408235554621665
ACO iter: 20 / 150 , 適應度: 4.754201711827778
ACO iter: 30 / 150 , 適應度: 4.754201711827778
ACO iter: 40 / 150 , 適應度: 4.754201711827778
ACO iter: 50 / 150 , 適應度: 4.754201711827778
ACO iter: 60 / 150 , 適應度: 4.754201711827778
ACO iter: 70 / 150 , 適應度: 4.754201711827778
ACO iter: 80 / 150 , 適應度: 4.754201711827778
ACO iter: 90 / 150 , 適應度: 4.754201711827778
ACO iter: 100 / 150 , 適應度: 4.754201711827778
ACO iter: 110 / 150 , 適應度: 4.754201711827778
ACO iter: 120 / 150 , 適應度: 4.754201711827778
ACO iter: 130 / 150 , 適應度: 4.754201711827778
ACO iter: 140 / 150 , 適應度: 4.754201711827778