任務調度算法


前言

本文介紹了任務調度算的應用場景,算法分析,遺傳算法,國產2個優秀算法框架及實現旅行商問題的缺陷,最后根據遺傳算法原理編碼實現來規避缺陷  

1 應用場景

任務調度時,有多達幾十種調度任務,有的任務不依賴上一條任務,有的任務只有在上一條任務執行完后才能執行,每條任務執行期間設備都可能會移動一段距離,並且設備只會在一個固定的區域移動。任務調度時,如何使設備移動具體最小?

問題延伸一下,這里的任務調度目標是設備移動距離最小,調度目標也有可能是總任務完成時間最短。根據以往經驗,從上一條任務執行到本任務都有一個時間,有開始時刻,結束時刻,可以構建一個n×n二維任務時刻矩陣,求任務完成時間最小值,等等。

上述問題非常典型,比如車間任務調度,巡檢任務調度等等!

2 算法分析

根據需求,設備總移動距離是目標函數,有的任務只有在上一條任務執行完后才能執行,為順序約束。這是一個排列組合問題,如果有n條任務,不考慮順序約束,有n!種組合,15!是1,307,674,368,000種組合,30!是265,252,859,812,191,058,636,308,480,000,000種組合。這么龐大的組合數量,不可能用傳統求和之后再比較得出最短距離。

針對上述分析結論,既然不能用傳統算法,那么就需要用到大數據智能優化搜索算法,這些算法有差分進化算法,遺傳算法,粒子群算法,模擬退火算法,蟻群算法,魚群算法等等。排列組合,路徑規划,是一個典型的旅行商問題,旅行商問題來源於遺傳算法,問題歸結於遺傳算法的旅行商問題

3 遺傳算法

遺傳算法,來源於百度百科

遺傳算法(Genetic Algorithm,GA)最早是由美國的 John holland於20世紀70年代提出,該算法是根據大自然中生物體進化規律而設計提出的。是模擬達爾文生物進化論的自然選擇和遺傳學機理的生物進化過程的計算模型,是一種通過模擬自然進化過程搜索最優解的方法。該算法通過數學的方式,利用計算機仿真運算,將問題的求解過程轉換成類似生物進化中的染色體基因的交叉、變異等過程。在求解較為復雜的組合優化問題時,相對一些常規的優化算法,通常能夠較快地獲得較好的優化結果。遺傳算法已被人們廣泛地應用於組合優化、機器學習、信號處理、自適應控制和人工生命等領域

3.1 簡介

遺傳算法的起源可追溯到20世紀60年代初期。1967年,美國密歇根大學J. Holland教授的學生 Bagley在他的博士論文中首次提出了遺傳算法這一術語,並討論了遺傳算法在博弈中的應用,但早期研究缺乏帶有指導性的理論和計算工具的開拓。1975年, J. Holland等提出了對遺傳算法理論研究極為重要的模式理論,出版了專著《自然系統和人工系統的適配》,在書中系統闡述了遺傳算法的基本理論和方法,推動了遺傳算法的發展。20世紀80年代后,遺傳算法進入興盛發展時期,被廣泛應用於自動控制、生產計划、圖像處理、機器人等研究領域。

3.2 基本框架

3.2.1 編碼

由於遺傳算法不能直接處理問題空間的參數,因此必須通過編碼將要求解的問題表示成遺傳空間的染色體或者個體。這一轉換操作就叫做編碼,也可以稱作(問題的)表示(representation)。  
評估編碼策略常采用以下3個規范:  
a)完備性(completeness):問題空間中的所有點(候選解)都能作為GA空間中的點(染色體)表現。
b)健全性(soundness): GA空間中的染色體能對應所有問題空間中的候選解。
c)非冗余性(nonredundancy):染色體和候選解一一對應。  

3.2.2 適應度函數

進化論中的適應度,是表示某一個體對環境的適應能力,也表示該個體繁殖后代的能力。遺傳算法的適應度函數也叫評價函數,是用來判斷群體中的個體的優劣程度的指標,它是根據所求問題的目標函數來進行評估的。
遺傳算法在搜索進化過程中一般不需要其他外部信息,僅用評估函數來評估個體或解的優劣,並作為以后遺傳操作的依據。由於遺傳算法中,適應度函數要比較排序並在此基礎上計算選擇概率,所以適應度函數的值要取正值。由此可見,在不少場合,將目標函數映射成求最大值形式且函數值非負的適應度函數是必要的。  
適應度函數的設計主要滿足以下條件:
a)單值、連續、非負、最大化  
b) 合理、一致性  
c)計算量小
d)通用性強。
在具體應用中,適應度函數的設計要結合求解問題本身的要求而定。適應度函數設計直接影響到遺傳算法的性能。  

3.2.3 初始群體選取

遺傳算法中初始群體中的個體是隨機產生的。一般來講,初始群體的設定可采取如下的策略:  
a)根據問題固有知識,設法把握最優解所占空間在整個問題空間中的分布范圍,然后,在此分布范圍內設定初始群體。  
b)先隨機生成一定數目的個體,然后從中挑出最好的個體加到初始群體中。這種過程不斷迭代,直到初始群體中個體數達到了預先確定的規模。  

3.3 運算過程

遺傳算法的基本運算過程如下:  
(1)初始化:設置進化代數計數器t=0,設置最大進化代數T,隨機生成M個個體作為初始群體P(0)。  
(2)個體評價:計算群體P(t)中各個個體的適應度。  
(3)選擇運算:將選擇算子作用於群體。選擇的目的是把優化的個體直接遺傳到下一代或通過配對交叉產生新的個體再遺傳到下一代。選擇操作是建立在群體中個體的適應度評估基礎上的。  
(4)交叉運算:將交叉算子作用於群體。遺傳算法中起核心作用的就是交叉算子。  
(5)變異運算:將變異算子作用於群體。即是對群體中的個體串的某些基因座上的基因值作變動。群體P(t)經過選擇、交叉、變異運算之后得到下一代群體P(t+1)。  
(6)終止條件判斷:若t=T,則以進化過程中所得到的具有最大適應度個體作為最優解輸出,終止計算。  
遺傳操作包括以下三個基本遺傳算子(genetic operator):選擇(selection);交叉(crossover);變異(mutation)。  

3.3.1 選擇

從群體中選擇優勝的個體,淘汰劣質個體的操作叫選擇。選擇算子有時又稱為再生算子(reproduction operator)。選擇的目的是把優化的個體(或解)直接遺傳到下一代或通過配對交叉產生新的個體再遺傳到下一代。選擇操作是建立在群體中個體的適應度評估基礎上的,常用的選擇算子有以下幾種:適應度比例方法、隨機遍歷抽樣法、局部選擇法。  

3.3.2 交叉

在自然界生物進化過程中起核心作用的是生物遺傳基因的重組(加上變異)。同樣,遺傳算法中起核心作用的是遺傳操作的交叉算子。所謂交叉是指把兩個父代個體的部分結構加以替換重組而生成新個體的操作。通過交叉,遺傳算法的搜索能力得以飛躍提高。  

3.2.3 變異

變異算子的基本內容是對群體中的個體串的某些基因座上的基因值作變動。依據個體編碼表示方法的不同,可以有以下的算法:  
a)實值變異。  
b)二進制變異。  
一般來說,變異算子操作的基本步驟如下:  
a)對群中所有個體以事先設定的變異概率判斷是否進行變異  
b)對進行變異的個體隨機選擇變異位進行變異。
遺傳算法引入變異的目的有兩個:一是使遺傳算法具有局部的隨機搜索能力。當遺傳算法通過交叉算子已接近最優解鄰域時,利用變異算子的這種局部隨機搜索能力可以加速向最優解收斂。顯然,此種情況下的變異概率應取較小值,否則接近最優解的積木塊會因變異而遭到破壞。二是使遺傳算法可維持群體多樣性,以防止出現未成熟收斂現象。此時收斂概率應取較大值。  

3.2.4 終止條件

當最優個體的適應度達到給定的閾值,或者最優個體的適應度和群體適應度不再上升時,或者迭代次數達到預設的代數時,算法終止。預設的代數一般設置為100-500代。  

3.3 特點

遺傳算法是解決搜索問題的一種通用算法,對於各種通用問題都可以使用。搜索算法的共同特征為:  
(1) 首先組成一組候選解  
(2)依據某些適應性條件測算這些候選解的適應度  
(3)根據適應度保留某些候選解,放棄其他候選解  
(4) 對保留的候選解進行某些操作,生成新的候選解。  
在遺傳算法中,上述幾個特征以一種特殊的方式組合在一起:基於染色體群的並行搜索,帶有猜測性質的選擇操作、交換操作和突變操作。這種特殊的組合方式將遺傳算法與其它搜索算法區別開來。  
遺傳算法還具有以下幾方面的特點:  
(1)算法從問題解的串集開始搜索,而不是從單個解開始。這是遺傳算法與傳統優化算法的極大區別。傳統優化算法是從單個初始值迭代求最優解的;容易誤入局部最優解。遺傳算法從串集開始搜索,覆蓋面大,利於全局擇優。  
(2)遺傳算法同時處理群體中的多個個體,即對搜索空間中的多個解進行評估,減少了陷入局部最優解的風險,同時算法本身易於實現並行化。  
(3)遺傳算法基本上不用搜索空間的知識或其它輔助信息,而僅用適應度函數值來評估個體,在此基礎上進行遺傳操作。適應度函數不僅不受連續可微的約束,而且其定義域可以任意設定。這一特點使得遺傳算法的應用范圍大大擴展。
(4)遺傳算法不是采用確定性規則,而是采用概率的變遷規則來指導他的搜索方向。
(5)具有自組織、自適應和自學習性。遺傳算法利用進化過程獲得的信息自行組織搜索時,適應度大的個體具有較高的生存概率,並獲得更適應環境的基因結構。  
(6)此外,算法本身也可以采用動態自適應技術,在進化過程中自動調整算法控制參數和編碼精度,比如使用模糊自適應法。  

3.4 不足之處

(1)編碼不規范及編碼存在表示的不准確性。  
(2)單一的遺傳算法編碼不能全面地將優化問題的約束表示出來。考慮約束的一個方法就是對不可行解采用閾值,這樣,計算的時間必然增加。  
(3)遺傳算法通常的效率比其他傳統的優化方法低。  
(4)遺傳算法容易過早收斂。  
(5)遺傳算法對算法的精度、可行度、計算復雜性等方面,還沒有有效的定量分析方法。

3.5 應用

由於遺傳算法的整體搜索策略和優化搜索方法在計算時不依賴於梯度信息或其它輔助知識,而只需要影響搜索方向的目標函數和相應的適應度函數,所以遺傳算法提供了一種求解復雜系統問題的通用框架,它不依賴於問題的具體領域,對問題的種類有很強的魯棒性,所以廣泛應用於許多科學,下面我們將介紹遺傳算法的一些主要應用領域  

3.5.1 函數優化

函數優化是遺傳算法的經典應用領域,也是遺傳算法進行性能評價的常用算例,許多人構造出了各種各樣復雜形式的測試函數:連續函數和離散函數、凸函數和凹函數、低維函數和高維函數、單峰函數和多峰函數等。對於一些非線性、多模型、多目標的函數優化問題,用其它優化方法較難求解,而遺傳算法可以方便的得到較好的結果。  

3.5.2 組合優化

隨着問題規模的增大,組合優化問題的搜索空間也急劇增大,有時在計算上用枚舉法很難求出最優解。對這類復雜的問題,人們已經意識到應把主要精力放在尋求滿意解上,而遺傳算法是尋求這種滿意解的最佳工具之一。實踐證明,遺傳算法對於組合優化中的NP問題非常有效。例如遺傳算法已經在求解旅行商問題、 背包問題、裝箱問題、圖形划分問題等方面得到成功的應用。  
此外,GA也在生產調度問題、自動控制、機器人學、圖象處理、人工生命、遺傳編碼和機器學習等方面獲得了廣泛的運用。  

3.5.3 車間調度

車間調度問題是一個典型的NP-Hard問題,遺傳算法作為一種經典的智能算法廣泛用於車間調度中,很多學者都致力於用遺傳算法解決車間調度問題,現今也取得了十分豐碩的成果。從最初的傳統車間調度(JSP)問題到柔性作業車間調度問題(FJSP),遺傳算法都有優異的表現,在很多算例中都得到了最優或近優解。  

4 智能搜索算法框架

智能搜索算法主流國產開源框架有geatpy和scikit-opt

4.1 geatpy

4.1.1 開源地址

https://github.com/geatpy-dev/geatpy

4.1.2 簡介

geatpy團隊現由華南農業大學、暨南大學、華南理工大學等優秀碩士以及一批優秀校友、優秀本科生組成。geatpy是一個高性能實用型進化算法工具箱,提供了許多已實現的進化算法各項操作的函數,如初始化種群、選擇、交叉、變異、多目標優化參考點生成、非支配排序、多目標優化GD、IGD、HV等指標的計算等等。沒有繁雜的深度封裝,可以清晰地看到其基本結構及相關算法的實現,並利用geatpy函數方便地進行自由組合,實現和研究多種改進的進化算法、多目標優化、並行進化算法等,解決傳統優化算法難以解決的問題。

geatpy在Python上提供簡便易用的面向對象的進化算法框架。通過繼承問題類,可以輕松把實際問題與進化算法相結合。geatpy還提供多種進化算法模板類,涵蓋遺傳算法、差分進化算法、進化策略、多目標進化優化算法等,可以通過調用算法模板,輕松解決單目標優化、多目標優化、組合優化、約束優化等問題。這些都是開源的,你可以參照這些模板,加以改進或重構,以便實現效果更好的進化算法,或是讓進化算法更好地與當前正在進行的項目進行融合。

4.1.3 旅行商問題求解

import numpy as np
import geatpy as ea
class MyProblem(ea.Problem):  # 繼承Problem父類
    def __init__(self):
        name = 'MyProblem'  # 初始化name(函數名稱,可以隨意設置)
        M = 1  # 初始化M(目標維數)
        maxormins = [1]  # 初始化maxormins(目標最小最大化標記列表,1:最小化該目標;-1:最大化該目標)
        Dim = 9  # 初始化Dim(決策變量維數)
        varTypes = [1] * Dim  # 初始化varTypes(決策變量的類型,元素為0表示對應的變量是連續的;1表示是離散的)
        lb = [1] * Dim  # 決策變量下界
        ub = [9] * Dim  # 決策變量上界
        lbin = [1] * Dim  # 決策變量下邊界(0表示不包含該變量的下邊界,1表示包含)
        ubin = [1] * Dim  # 決策變量上邊界(0表示不包含該變量的上邊界,1表示包含)
        # 調用父類構造方法完成實例化
        ea.Problem.__init__(self, name, M, maxormins, Dim, varTypes, lb, ub, lbin, ubin)
        # 新增一個屬性存儲旅行地坐標
        self.places = np.array([[0.4, 0.4439],
                                [0.2439, 0.1463],
                                [0.1707, 0.2293],
                                [0.2293, 0.761],
                                [0.5171, 0.9414],
                                [0.8732, 0.6536],
                                [0.6878, 0.5219],
                                [0.8488, 0.3609],
                                [0.6683, 0.2536],
                                [0.6195, 0.2634]])

    def aimFunc(self, pop):  # 目標函數
        x = pop.Phen  # 得到決策變量矩陣
        X = np.hstack([np.zeros((x.shape[0], 1)), x, np.zeros((x.shape[0], 1))]).astype(int) # 添加從0地出發且最后回到出發地
        ObjV = []  # 存儲所有種群個體對應的總路程
        for i in range(X.shape[0]):
            journey = self.places[X[i], :]  # 按既定順序到達的地點坐標
            distance = np.sum(np.sqrt(np.sum(np.diff(journey.T) ** 2, 0)))  # 計算總路程
            ObjV.append(distance)
        pop.ObjV = np.array([ObjV]).T  # 把求得的目標函數值賦值給種群pop的ObjV
        # 找到違反約束條件的個體在種群中的索引,保存在向量exIdx中(如:若0、2、4號個體違反約束條件,則編程找出他們來)
        exIdx1 = np.where(np.where(x == 3)[1] - np.where(x == 6)[1] < 0)[0]
        exIdx2 = np.where(np.where(x == 4)[1] - np.where(x == 5)[1] < 0)[0]
        exIdx = np.unique(np.hstack([exIdx1, exIdx2]))
        pop.CV = np.zeros((pop.sizes, 1))
        pop.CV[exIdx] = 1  # 把求得的違反約束程度矩陣賦值給種群pop的CV

import matplotlib.pyplot as plt

if __name__ == '__main__':
    """================================實例化問題對象============================"""
    problem = MyProblem()  # 生成問題對象
    """==================================種群設置=============================="""
    Encoding = 'P'  # 編碼方式,采用排列編碼
    NIND = 50  # 種群規模
    Field = ea.crtfld(Encoding, problem.varTypes, problem.ranges, problem.borders)  # 創建區域描述器
    population = ea.Population(Encoding, Field, NIND)  # 實例化種群對象(此時種群還沒被初始化,僅僅是完成種群對象的實例化)
    """================================算法參數設置============================="""
    myAlgorithm = ea.soea_SEGA_templet(problem, population)  # 實例化一個算法模板對象
    myAlgorithm.MAXGEN = 200  # 最大進化代數
    myAlgorithm.mutOper.Pm = 0.5  # 變異概率
    myAlgorithm.logTras = 1  # 設置每隔多少代記錄日志,若設置成0則表示不記錄日志
    myAlgorithm.verbose = True  # 設置是否打印輸出日志信息
    myAlgorithm.drawing = 1  # 設置繪圖方式(0:不繪圖;1:繪制結果圖;2:繪制目標空間過程動畫;3:繪制決策空間過程動畫)
    """===========================調用算法模板進行種群進化========================"""
    [BestIndi, population] = myAlgorithm.run()  # 執行算法模板,得到最優個體以及最后一代種群
    BestIndi.save()  # 把最優個體的信息保存到文件中
    """==================================輸出結果=============================="""
    print('評價次數:%s' % myAlgorithm.evalsNum)
    print('時間已過 %s 秒' % myAlgorithm.passTime)
    if BestIndi.sizes != 0:
        print('最短路程為:%s' % BestIndi.ObjV[0][0])
        print('最佳路線為:')
        best_journey = np.hstack([0, BestIndi.Phen[0, :], 0])
        for i in range(len(best_journey)):
            print(int(best_journey[i]), end=' ')
        # 繪圖
        plt.figure()
        plt.plot(problem.places[best_journey.astype(int), 0], problem.places[best_journey.astype(int), 1], c='black')
        plt.plot(problem.places[best_journey.astype(int), 0], problem.places[best_journey.astype(int), 1], 'o',c='black')
        for i in range(len(best_journey)):
            plt.text(problem.places[int(best_journey[i]), 0], problem.places[int(best_journey[i]), 1],
                     chr(int(best_journey[i]) + 65), fontsize=20)
        plt.grid(True)
        plt.title('連線圖')
        plt.xlabel('經度')
        plt.ylabel('經度')
        plt.savefig('roadmap.svg', dpi=600, bbox_inches='tight')
    else:
        print('沒找到可行解。')

 

4.2 scikit-opt

4.2.1 開源地址

https://github.com/guofei9987/scikit-opt/

4.2.2 簡介

是一款個人維護,容易上手,github上引用多,文檔齊全,接近Python原生語法的智能優化群分算法

4.2.3 旅行商問題求解

import numpy as np
from scipy import spatial
import matplotlib.pyplot as plt
font = {'family':'SimHei','weight':'bold','size':'9'}
plt.rc('font',**font)
plt.rc('axes',unicode_minus = False) #解決負號顯示問題
num_points = 10
def task_distance(num_points,minValue, maxValue):
    li = np.array([[7,21,11,28,3,23,28,3,19,23],
                   [11,3,2,17,12,30,21,18,25,5],
                   [6,28,24,11,27,27,7,13,25,10],
                   [26,21,12,11,16,17,18,14,25,19],
                   [7,9,8,18,25,17,23,26,21,27],
                   [30,11,26,2,13,8,3,20,5,25],
                   [8,24,10,10,5,26,5,27,3,16],
                   [28,23,28,11,22,2,15,11,13,16],
                   [17,10,30,3,7,22,23,30,4,2],
                   [16,29,10,28,8,10,2,8,10,2]])
    return li
def cal_total_distance(routine):
    num_points, = routine.shape
    sum =0;
    for i in range(num_points-1):
        sum += distance_matrix[routine[i],routine[i+1]]
    return sum  

from sko.GA import GA_TSP
points_coordinate = task_distance(num_points,2,30)  # generate coordinate of points
distance_matrix = points_coordinate
ga_tsp = GA_TSP(func=cal_total_distance, n_dim=num_points, size_pop=50, max_iter=20, prob_mut=1)
best_points, best_distance = ga_tsp.run()
print("best_points is ",best_points)
print("best_distance is ", best_distance)

#畫圖 
best_points_ = best_points
best_points_coordinate = points_coordinate[best_points_, :]
plt.figure(1)
plt.subplot(221,title='連線圖')
plt.xlabel("經度")
plt.ylabel("緯度")
plt.plot(best_points_coordinate[:, 0], best_points_coordinate[:, 1], 'o-r')
plt.subplot(222,title='收斂圖')
plt.xlabel("迭代次數")
plt.ylabel("最短距離")
plt.plot(ga_tsp.generation_best_Y)
plt.show()

 

4.3 旅行商問題開源框架缺陷

分析2種算法框架,有運行速度快,准確度高等特點,單旅行商無約束模型這2個框架都非常適合。

缺點也很明顯,實際應用中,有順序約束,也有幾個車間任務,幾個巡檢任務同時調度,即多旅行商有約束的情況,這些框架算法就很難實現,甚至不支持,需要自行編碼實現多旅行商有約束的遺傳算法

5 自行編碼實現任務調度

根據遺傳算法原理,自行編碼實現遺傳算法,實現任務調度

5.1 旅行商類

 

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt font = {'family':'SimHei','weight':'bold','size':'9'} plt.rc('font',**font) plt.rc('axes',unicode_minus = False) #解決負號顯示問題 class TPS(object): pop_size = 50 #種群大小 c_rate = 0.7 #交叉率 m_rate = 0.05 #突變率 pop = np.array([])#種群數量 fitness = np.array([]) #適應度數組 ga_num = 200 #最大迭代次數 best_distance = 1 #記錄目前最優距離 best_gene = [] #記錄目前最優任務調度方案 tasks = np.array([]) task_size = 0 #標記任務數目 tasks_distance = np.array([[[]]]) task_distance_start = 1 #任務開始位置 task_cons_order = np.array([[]])#順序約束 task_cons_no = np.array([])
def __init__(self,c_rate,m_rate,pop_size,ga_num,start_pos,task_list,task_cons_order,tasks_cons_number): self.fitness = np.zeros(self.pop_size) self.c_rate = c_rate self.m_rate = m_rate self.pop_size = pop_size self.ga_num = ga_num self.task_distance_start = start_pos self.tasks_distance = np.array(task_list) self.task_size = self.tasks_distance.shape[1] self.tasks = np.arange(self.task_size) self.task_cons_order = np.array(task_cons_order) self.task_cons_no = np.array(tasks_cons_number) def init(self): self.pop = self.creat_pop(self.pop_size) self.fitness = self.get_fitness(self.pop) #計算種群適應度 def creat_pop(self,size): pop = [] for i in range(size): gene = self.creat_gene() #創建基因 pop.append(gene) #加入種群 return np.array(pop) def creat_gene(self,cons_order = True): gene = np.arange(self.task_size) # 問題的解,基因組合,種群中的個體 np.random.shuffle(gene) #打亂基因組合 if cons_order: gene = self.constraint_order(gene) #順序約束 return gene def constraint_order(self,gene):#順序約束 if len(self.task_cons_order)==0 or len(self.task_cons_order[0])==0:#無約束直接返回 return gene for i,elem_one in enumerate(gene):#選擇排序 if gene[i] not in self.task_cons_no: continue for j,elem_two in enumerate(gene): if i==j: continue chage = self.change_order(gene[i],gene[j])#判斷是否要調整順序 if chage: gene[i],gene[j] = gene[j],gene[i] return gene def change_order(self,begin,end):#交換順序 result,cycle = get_link_next(self.task_cons_order,begin,end) return result def get_fitness(self,pop): d = np.array([]) #適應度記錄數組 for i in range(pop.shape[0]): gene = pop[i] #取個體 distance = self.get_distance(gene)#計算個體基因的優劣 distance = self.best_distance/distance #當前最優距離除以個體距離,越近適應度越高,最優適應度為1 d= np.append(d,distance) #保存種群中每個個體適應度 return d def get_distance(self,gene): s = 0 for i in range(len(gene)): if i==0: start_task = self.tasks_distance[0][gene[i]] start_distance_one = self.task_distance_start - start_task[0] start_distance_two = start_task[0]-start_task[1] s+= np.abs(start_distance_one) + np.abs(start_distance_two) else: s+= self.caculate_distance(gene[i-1],gene[i]) return s def caculate_distance(self,i,j): distance_one = self.tasks_distance[0][i][1]-self.tasks_distance[0][j][0] distance_two = self.tasks_distance[0][j][0]-self.tasks_distance[0][j][1] distance = np.abs(distance_one) + np.abs(distance_two) return distance def select_pop(self,pop): #低於平均的要替換改變 best_f_index = np.argmax(self.fitness) av = np.median(self.fitness, axis = 0) for i in range(self.pop_size): if i!=best_f_index and self.fitness[i] < av: pi = self.cross(pop[best_f_index],pop[i]) pi = self.mutate(pi) pop[i,:] = pi[:] return pop;

def cross(self,parent1,parent2):#交叉 if np.random.rand()>self.c_rate: return parent1 index1 = np.random.randint(0,self.task_size-1) index2 = np.random.randint(index1,self.task_size-1) tempGene = parent2[index1:index2] #交叉基因片段 newGene = [] pllen =0 for g in parent1: if pllen == index1: newGene.extend(tempGene) #插入基因片段 if g not in tempGene: newGene.append(g) pllen+=1 newGene = np.array(newGene) if newGene.shape[0]!=self.task_size: print('c error') return self.creat_pop(1) #newGene = self.constraint_order(newGene) #順序約束 return newGene def mutate(self,gene):#變異 if np.random.rand()> self.m_rate: gene = self.constraint_order(gene) #順序約束 return gene index1 = np.random.randint(0,self.task_size-1) index2 = np.random.randint(index1,self.task_size-1) newGene = self.reverse_gene(gene,index1,index2) if newGene.shape[0]!=self.task_size: print('m error') return self.creat_pop(1) newGene = self.constraint_order(newGene) #順序約束 return newGene def reverse_gene(self,gene,i,j):#反轉 #翻轉基因中i到j之間的基因片段 if i>=j: return gene if j>self.task_size-1: return gene parent1 = np.copy(gene) tempGene = parent1[i:j] newGene = [] pllen = 0 for g in parent1: if pllen==i: newGene.extend(tempGene[::-1]) #插入基因片段 if g not in tempGene: newGene.append(g) pllen+=1 return np.array(newGene) def evelution(self): #主程序:迭代進化種群 best_iteration = 0; best_gene = [] best_distance =0 tsp = self x_axis = [] y_axis = [] for i in range(self.ga_num): best_f_index = np.argmax(tsp.fitness) worst_f_index = np.argmin(tsp.fitness) local_best_gene = tsp.pop[best_f_index] local_best_distance = tsp.get_distance(local_best_gene) if i ==0: tsp.best_gene = local_best_gene tsp.best_distance = tsp.get_distance(local_best_gene) best_gene = tsp.best_gene best_distance = tsp.best_distance if local_best_distance < best_distance: best_iteration = i tsp.best_distance = local_best_distance #記錄最優值 tsp.best_gene = local_best_gene #記錄最優個體基因 best_gene = tsp.best_gene best_distance = tsp.best_distance else: tsp.pop[worst_f_index] = tsp.cross(self.creat_gene(cons_order = False),best_gene) tsp.pop[worst_f_index] = tsp.mutate(tsp.pop[worst_f_index]) this_best_distance = self.get_distance(self.best_gene) x_axis.append(i+1) y_axis.append(this_best_distance) print('當前迭代:%d 全局最優值:%s 本次最優值:%s 本次最優染色體:%s'%(i+1,best_distance,this_best_distance,self.best_gene)) tsp.pop = tsp.select_pop(tsp.pop) #選擇淘汰種群 #創建新種群 best_gene_number = 0 #保留一條最優解 for j in range(self.pop_size): best_gene_equal = (tsp.pop[j] == best_gene).all() if best_gene_equal: best_gene_number+=1 #print("第%d次迭代包含相等的最優距離,個數%d"%(i+1,best_gene_number)) if best_gene_equal==False or best_gene_number>1: r = np.random.randint(0,self.pop_size-1) if j!=r: tsp.pop[j] == tsp.cross(tsp.pop[j],tsp.pop[r]) #交叉種群中第j,r個體的基因 tsp.pop[j] = tsp.mutate(tsp.pop[j]) #突變種群中第j個個體基因 self.fitness = self.get_fitness(self.pop) #計算種群適應度
plt.xlabel("迭代次數") plt.ylabel("最短距離") plt.title("收斂圖(第%d次迭代出現最短距離%d)"%(best_iteration+1,best_distance)) plt.plot(x_axis,y_axis) plt.show() print('最優迭代:%d 最優值:%s 最優染色體:%s'%(best_iteration+1,best_distance,best_gene)) best_gene_str = self.get_gene_str(best_gene) result = "%d|%s"%(best_distance,best_gene_str) print(result) return result def get_gene_str(self,gene): genStr = '' for i in gene: genStr += str(i)+"," return genStr[0:-1]

5.2 輔助函數

def get_task_into(t): #轉為任務列表
    #t=1,1|2,2|3,3|4,4|5,5|6,6|7,7|8,8|9,9|10,10
    task = [] #任務
    task_list = [] #任務列表
    cons_order_list = [] #順序約束列表 
    task_cons_no = []
    for i,elem_one in enumerate(t.split("|")):
        task_bay = []#貝位:開始貝位,結束貝位
        cons_order =[] #順序約束
        for j,elem_two in enumerate(elem_one.split(",")): 
            if j<2:
                task_bay.append(int(elem_two))
            elif j==2 and elem_two!='':
                cons_order.append(i)
                end = int(elem_two)
                cons_order.append(end)
                cons_order_list.append(cons_order)                
                if i not in task_cons_no:
                    task_cons_no.append(i)
                if end not in  task_cons_no:
                    task_cons_no.append(end)                                
        task.append(task_bay)
    task_list.append(task)
    return task_list,cons_order_list,task_cons_no

def dead_cycle(arr):#死循環判斷 arr = np.array([[5,1],[1,3],[3,5]])
    for i in range(len(arr)):
        if len(arr[i])==0:
            continue
        begin,end = arr[i][0],arr[i][1]
        result,cycle = get_link_next(arr,begin,end)
        if result:
            print("發生死循環任務編號%d"%(cycle))
            return result
    return False
                           
def get_link_next(arr,begin,end):#獲取鏈路下一個,返回是否死循環和最后的
    for j in range(len(arr)):
        if len(arr[j])==0 or (begin == arr[j][0] and end == arr[j][1]): #排除本身
            continue
        if end == arr[j][0]: #向后
            if begin == arr[j][1]:
                #print("發生死循環任務指針編號%d,%d"%(begin,end))
                return True,end 
            else: 
                end = arr[j][1]
                result =  get_link_next(arr,begin,end)
                return result
    return False,end

5.3 調用主方法

if __name__ == '__main__':
    #app.run(debug=True, port=8888,host=0.0.0.0) #指定端口、host,0.0.0.0代表不管幾個網卡,任何ip都可以訪問
    start_pos = int(1)
    task_list,cons_order,task_cons_no = get_task_into('1,1|2,2,3,|3,3|4,4|5,5|6,6|7,7|8,8|9,9|10,10|1,1|2,2|3,3|4,4|5,5')
    dead = dead_cycle(np.array(cons_order)) #驗證死循環
    if dead:
        print("發生死循環了")
    else: 
        iteration = len(task_cons_no)
        print("順序約束任務個數數為%d"%(iteration))
        iteration = 16-len(task_cons_no)
        if iteration <=0:
            iteration =1        
        self =  TPS(0.8,0.05,2000,iteration,start_pos,task_list,cons_order,task_cons_no)
        self.init()
        result = self.evelution()

5.4 WISG run_server部署方法

#導入WISG(Web Server Gateway Interface)
from wsgiref.simple_server import make_server
from urllib import parse
import json
import pdb

def getSingleTaskSchedule(environ,start_response):
start_response('200 OK',[('Content-Type','text/html;charset=urf-8')]) #調用urlparse的parse_qs解析URL參數,並返回字典 query_args=environ['QUERY_STRING'] params = parse.parse_qs(query_args) start_pos_value = params['startPos'] task_list_value = params['taskList'] # 獲取get中key為name的值 start_pos = int(str(start_pos_value).replace("[","").replace("]","").replace("'","")) task_list,cons_order,tasks_cons_number = get_task_into(str(task_list_value).replace("[","").replace("]","").replace("'","")) dead = dead_cycle(np.array(cons_order)) #驗證死循環 if dead: result = json.dumps("-9998|任務死循環").encode('utf-8') return [result] else: iteration = len(tasks_cons_number) print("順序約束任務個數數為%d"%(iteration)) iteration = 16-len(tasks_cons_number) if iteration <=0: iteration =1 self = TPS(0.7,0.05,2000,iteration,start_pos,task_list,cons_order,tasks_cons_number) self.init() result = json.dumps(self.evelution()).encode('utf-8') return [result] return [result] def get_url(): data= {'/getSingleTaskSchedule':getSingleCraneSchedule}
return data def run_server(environ,start_response): urls=get_url() url=environ.get("PATH_INFO") print(url) if url in urls: function_data = urls[url](environ,start_response) return function_data else: start_response("200 OK",[('Content-Type','text/html;charset=utf-8')]) result = [bytes("-9999|路由錯誤",encoding="utf-8")] return result httpd =make_server('0.0.0.0',8801,run_server) # ip='0.0.0.0' port=8801 print("server is started, port is 8801....") httpd.serve_forever()

5.5 運行結果

順序約束任務個數數為2
當前迭代:1 全局最優值:28 本次最優值:28 本次最優染色體:[14  4  9  8  7  6  2  3 12 13  5 11 10  1  0]
當前迭代:2 全局最優值:24 本次最優值:24 本次最優染色體:[14  4  9  8  7  6  5  2  3 12 13 11 10  1  0]
當前迭代:3 全局最優值:20 本次最優值:20 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11 10  1  0]
當前迭代:4 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:5 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:6 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:7 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:8 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:9 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:10 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:11 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:12 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:13 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]
當前迭代:14 全局最優值:18 本次最優值:18 本次最優染色體:[14  4  9  8  7  6  5  3 13  2 12 11  1 10  0]

 

  最優迭代:4 最優值:18 最優染色體:[14 4 9 8 7 6 5 3 13 2 12 11 1 10 0]

  18|14,4,9,8,7,6,5,3,13,2,12,11,1,10,0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM