- 基本框架
- MDU(消息分發單元):包含一個消息處理任務,包含自身的消息隊列,是一個消息調度的基本單位。
- PID (功能子模塊) :框架中用PID作為模塊的划分,每個模塊具有自己的PID編號,根據功能和調度需求可以安排多個PID到一個MDU中,PID是消息通信的一個基本單位,每個PID提供一個消息處理入口。
- MQ (消息隊列) :使用消息隊列作為任務通信的數據結構。
- 消息處理流程
- 構建一個MDU模塊,注冊入框架中,初始MDU沒有注冊PID,未構建消息處理任務。
- 構建PID,注冊入對應的MDU中,如果是MDU中第一個PID,構建消息處理任務。消息處理任務從該MDU對應的消息隊列中取消息處理。
- 消息處理任務獲取消息后根據消息中攜帶的接收PID的信息分發到對應的PID模塊處理。
- 完整的消息交互流程
- 任務A申請消息,消息內容必須包括發送模塊PID編號、接收模塊PID編號、消息內容。
- 通過消息框架提供的消息發送接口直接發送消息,消息框架根據接收PID信息,將消息填入對應MDU的消息隊列中。
- MDU的消息處理任務B從消息隊列中獲取消息處理。
- MDU消息隊列會被多個任務並發寫入消息,被消息處理任務讀取消息處理,需要對消息隊列進行互斥和同步。詳見http://www.cnblogs.com/chencheng/p/2893421.html
- MUD、PID規划
- MDU作為一個調度基本單元,如果一個MDU中只有一個PID會導致系統中任務多,任務切換的開銷大。
- 如果MDU中包含太多PID,由於所有PID在一個消息隊列中串行運行,會影響PID的響應,影響系統性能。
- 功能緊耦合的PID放入一個MDU中。
- 耗時PID和實時要求高的PID不放入一個MDU中。
- 實現
MDU:

import myQueue from myThread import myThread class mdu: def __init__(self, mduID): self.mduId = mduID self.msgQue = myQueue.myQueue(10) self.map = {} def getMduID(self): return self.mduId def registPid(self, pidID, pid): self.map[pidID] = pid if 1==len(self.map): self.run() def msgEnQueue(self,msg): self.msgQue.enQueue(msg) def msgProcess(self): while True: msg = self.msgQue.deQueue() recvPid = msg.getRecvPid() self.map[recvPid].msgProcess(msg); def run(self): t = myThread(self.msgProcess) t.start()
PID:

import message import support import mdu import pdb class pid: def __init__(self, pidID): self.pidID = pidID self.registMe() def registMe(self): support.registPid(self) def getPidID(self): return self.pidID
SUPPORT:

import mdu import pdb mduMap = {} def registMdu(mdu): mduMap[mdu.getMduID()] = mdu def getMdu(revPid): return mduMap[revPid&0xFFFF0000>>16] def registPid(pid): mdu = getMdu(pid.getPidID()) #pdb.set_trace() mdu.registPid(pid.getPidID(), pid) def sendMsg(msg): mdu = getMdu(msg.getRecvPid()) mdu.msgEnQueue(msg)
MQ:

from threading import Lock from threading import Condition import threading class myQueue: def __init__(self, size): self.size = size self.list = list() self.lock = Lock() self.notFullCond = Condition(self.lock) self.notEmptyCond = Condition(self.lock) def isFull(self): if self.size == len(self.list): return True return False def isEmpty(self): if 0 == len(self.list): return True return False def enQueue(self, elem): self.lock.acquire() while self.isFull(): print('queue is full, waiting...') self.notFullCond.wait() print(threading.current_thread().getName() + ' product ' + str(elem)) self.list.append(elem) self.notEmptyCond.notify() self.lock.release() def deQueue(self): self.lock.acquire() while self.isEmpty(): print('queue is empty, waiting...') self.notEmptyCond.wait() elem = self.list[0] del(self.list[0]) print(threading.current_thread().getName() + ' consume ' + str(elem)) self.notFullCond.notify_all() self.lock.release() return elem