一. hpfeeds協議簡介
hpfeeds是一個輕量級的驗證發布-訂閱協議(authenticated publish-subscribe protocol)。
發布-訂閱協議:發布/訂閱協議定義了一種一對多的依賴關系,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知所有訂閱者對象,使它們能夠自動更新自己的狀態。它是為了解決這樣一種情況的發生,一個對象的改變需要同時改變其他對象,而且它不知道具體有多少對象需要改變。這就類似於發傳單,目標發送通知,沒有指定特定的對象,通知會自動傳播,觀察者自己決定是否需要看傳單,發送者對於傳單是否被看一無所知。
hpfeeds把不同的數據用頻道來划分,支持傳送任意的二進制數據。由頻道的使用者決定傳送數據結構的形式。通道的驗證通過Authkey來完成,它由兩部分組成:ident和secret,相當於身份和密碼。把密碼和每次連接的一個隨機數進行hash摘要算法然后一起發送給服務器,這樣保證不會被竊聽,所以它可以運行在SSL、TLS下。
整個協議實現了三方的功能獨立。使用這種方式降低了應用與業務邏輯之間的耦合,統一一個對外的發布接口,只需要關心監聽的類型,不關心監聽的具體處理人。頻道的發布者只管發,不管訂閱者有沒有收到,很方便的建立一種一對多的依賴關系。在當一個對象的改變需要同時改變其他對象,而且它不知道具體有多少對象需要改變時,就可以使用訂閱發布模式。
二. hpfeeds的原理
hpfeeds協議通過以下幾部分實現:hpfeeds server, hpfeeds client,mongodb數據庫。
1. hpfeeds server:
- 負責為每個client的連接生成一個連接標志;
- 檢查請求連接的client的id和sha1(nonce+Authkey);
- 檢查client的請求類型,發布還是接收;
2. hpfeeds client:
每個hpfeeds client都即可以作為發布者也可以作為訂閱者,發布者和訂閱者並不要求必須同時存在。
3. Mongodb:
mongodb數據庫用來存儲每個client的id和secret,並且每當有client請求連接server時,server都會從mangodb中取出該client注冊時的id和secret進行對比。 若對比一致則認證通過,client可以和server正常建立連接;若不一致則client與server建立連接失敗。
4. client和server的認證過程:
Client和server的認證及發布/訂閱過程如下圖1所示:
hpfeeds協議建立連接及通信的過程:
- Client發起連接請求;
- server為每個client的連接生成一個連接標志,並將其發送給請求連接的client;
- client發送自己的id和sha1(nonce+Authkey)到server進行認證;
- server從mongodb中取出相應的信息檢查驗證,若認證通過,保持連接並執行后續操作。否則,斷開連接;
- client發起publish/subscribe請求;
- server檢查client請求消息的類型,發布/訂閱。
三. hpfeeds的消息格式
1. Wire Protocol:
每個hpfeeds協議消息都攜帶了一個固定格式的消息頭,有兩個參數:消息總長度和請求類型。如下代碼所示。
1 struct MsgHeader { 2 unit32_t_messageLength; // total message size, including this request type 3 unit8_t_opCode; 4 };
請求類型有以下幾種:
- error(0): errormessage
- info(1): server name, nonce
- auth (2): client id, sha1(nonce+authkey)
- publish (3): client id, channelname, payload
-
subscribe (4): client id, channelname
一個完整的發布類型的消息如下圖所示,由消息頭、client_id的長度、client_id、channelname的長度、channelname、傳輸內容payload。payload可以是任意類型的二進制數據。
四. hpfeeds源碼解析
Hpfeeds協議server與client的通訊主要也是使用TCP套接字的方式。
Hpfeeds server采用了事件驅動的方式來處理client的請求。這樣做是為了應對高連接數高吞吐量的client請求,使用這種方法可以同時接收數百、數千甚至數萬個事件,無論它們是內部請求還是網絡連接,都可以高效地處理它們的操作。同時還能夠極大的降低資源占用,增大服務接待能力,並提高網絡傳輸效率。
Hpfeeds server與mongodb的連接及數據交互並沒有使用Python自帶的pymongo模塊,而是使用了自己編寫的一個基於事件驅動的MongoConn模塊。這樣做的目的也是為了處理高連接數的client請求。下面主要對hpfeeds的server和client的源碼進行解析。
1. hpfeeds server 源碼
Hpfeeds server的工作方式,首先連接mongodb數據庫,監聽hpfeeds server的服務端口,設置事件監聽器,關聯相應處理函數,將事件監聽器加入事件循環,啟動事件循環進行監聽。如果有client請求來,則會觸發相應的事件,調用與事件相關聯的函數進行處理操作。Hpfeeds server的主程序代碼如下。
1 #!/usr/bin/env python 2 3 4 import sys 5 6 import struct 7 import hashlib 8 import collections 9 import random 10 11 import logging 12 logging.basicConfig(level=logging.INFO) 13 14 from evnet import loop, unloop, listenplain, EventGen # 用於實現事件循環的模塊 15 from evnet.mongodb import MongoConn 16 # 注意:Python本身有對mongodb進行操作的模塊,但在hpfeeds server中沒有使用, 17 # 這里它自己實現了一個對mongodb進行操作的模塊MongoConn,為了實現使用事件循 18 # 的方式來對數據庫進行操作 19 20 FBIP = '0.0.0.0' # hpfeeds server監聽的地址和端口號 21 FBPORT = 10000 22 FBNAME = '@hp2' 23 MONGOIP = '127.0.0.1' 24 MONGOPORT = 27017 25 26 OP_ERROR = 0 27 OP_INFO = 1 28 OP_AUTH = 2 29 OP_PUBLISH = 3 30 OP_SUBSCRIBE = 4 31 OP_UNSUBSCRIBE = 5 32 33 MAXBUF = 10* (1024**2) 34 SIZES = { 35 OP_ERROR: 5+MAXBUF, 36 OP_INFO: 5+256+20, 37 OP_AUTH: 5+256+20, 38 OP_PUBLISH: 5+MAXBUF, 39 OP_SUBSCRIBE: 5+256*2, 40 OP_UNSUBSCRIBE: 5+256*2, 41 } 42 43 class BadClient(Exception): 44 pass 45 46 class FeedUnpack(object): # 對client傳來的數據進行解碼 47 def __init__(self): 48 self.buf = bytearray() 49 def __iter__(self): 50 return self 51 def next(self): 52 return self.unpack() 53 def feed(self, data): # 將client傳來的數據存入self.buf 54 self.buf.extend(data) 55 def unpack(self): 56 if len(self.buf) < 5: # 如果self.buf的總長度小於5,說明請求消息為空 57 # 因為client各種請求類型的消息長度都是大於5的 58 raise StopIteration('No message.') 59 60 ml, opcode = struct.unpack('!iB', buffer(self.buf,0,5)) 61 # 解碼出ml和opcode 62 if ml > SIZES.get(opcode, MAXBUF): # ml為hpfeeds消息的總長度 63 raise BadClient('Not respecting MAXBUF.') 64 65 if len(self.buf) < ml: # self.buf中的數據長度小於該條消息的總長度,拋出異常 66 raise StopIteration('No message.') 67 68 data = bytearray(buffer(self.buf, 5, ml-5)) 69 del self.buf[:ml] # 刪除self.buf中的數據 70 return opcode, data 71 # data中包含了len(client_id),client_id,length(channelname), channelname,payload 72 73 74 class FeedConn(EventGen): 75 def __init__(self, conn, addr, db): 76 EventGen.__init__(self) 77 self.conn = conn 78 self.addr = addr 79 self.db = db 80 self.pubchans = set() 81 self.subchans = set() 82 self.idents = set() 83 self.delay = False 84 85 self.rand = struct.pack('<I', random.randint(2**31,2**32-1)) # 產生一個隨機數 86 self.fu = FeedUnpack() 87 88 conn._on('read', self.io_in) 89 conn._on('close', self.closed) 90 91 self.sendinfo() 92 93 def sendinfo(self): 94 self.conn.write(self.msginfo()) 95 96 def auth(self, ident, hash): # server和client的認證函數 97 p = self.db.query('hpfeeds.auth_key', {'identifier': str(ident)}, limit=1) 98 # 查詢mongodb中的數據,返回的p為一個Promise()對象 99 p._when(self.checkauth, hash) # 調用checkauth函數對client進行認證 100 101 def dbexc(e): # mongodb查詢異常處理函數 102 logging.critical('Database query exception. {0}'.format(e)) 103 self.error('Database query exception.') 104 105 p._except(dbexc) # 如果出現異常則執行響應的處理函數 106 107 self.delay = True 108 109 def checkauth(self, r, hash): # server與client的認證處理函數 110 if len(r) > 0: # r是self._result 111 akobj = r[0] 112 akhash = hashlib.sha1('{0}{1}'.format(self.rand, akobj['secret'])).digest() 113 if akhash == hash: # 將數據庫中取得的secret與self.rand進行hash摘要算法進行對比 114 self.pubchans.update(akobj.get('publish', [])) # 更新發布頻道 115 self.subchans.update(akobj.get('subscribe', []))# 更新訂閱頻道 116 self.idents.add(akobj['identifier']) # 將認證成功的client_id添加到self.idents 117 logging.info('Auth success by {0}.'.format(akobj['identifier'])) 118 else: 119 self.error('authfail.') 120 logging.info('Auth failure by {0}.'.format(akobj['identifier'])) 121 else: 122 self.error('authfail.') 123 self.delay = False 124 self.io_in(b'') 125 126 def closed(self, reason): 127 logging.debug('Connection closed, {0}'.format(reason)) 128 self._event('close', self) 129 130 def may_publish(self, chan): 131 return chan in self.pubchans 132 133 def may_subscribe(self, chan): 134 return chan in self.subchans 135 136 def io_in(self, data): # 傳送請求發布和訂閱的數據函數 137 self.fu.feed(data) # 數據存入self.buf 138 if self.delay: # 經FeedUnpack的實例處理后的數據為opcode, data 139 return # data中包含了len(client_id),client_id,length(channelname), channelname,payload 140 try: 141 for opcode, data in self.fu: 142 if opcode == OP_PUBLISH: # 處理發布請求 143 rest = buffer(data, 0) # 數據存入buffer 144 ident, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0])) 145 chan, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0])) 146 # 解碼出發布請求包中的數據 147 if not ident in self.idents: 148 self.error('identfail.') 149 continue 150 151 if not self.may_publish(chan): 152 self.error('accessfail.') 153 continue 154 155 self._event('publish', self, chan, data) # 觸發發布請求的處理事件 156 elif opcode == OP_SUBSCRIBE: # 處理訂閱請求 157 rest = buffer(data, 0) 158 ident, chan = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):] 159 160 if not ident in self.idents: 161 self.error('identfail.') 162 continue 163 164 checkchan = chan 165 if chan.endswith('..broker'): checkchan = chan.rsplit('..broker', 1)[0] 166 167 if not self.may_subscribe(checkchan): 168 self.error('accessfail.') 169 continue 170 171 self._event('subscribe', self, chan, ident) # 觸發訂閱請求的處理事件 172 elif opcode == OP_UNSUBSCRIBE: # 處理取消訂閱請求 173 rest = buffer(data, 0) 174 ident, chan = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):] 175 176 if not ident in self.idents: 177 self.error('identfail.') 178 continue 179 180 if not self.may_subscribe(chan): 181 self.error('accessfail.') 182 continue 183 184 self._event('unsubscribe', self, chan, ident)# 觸發取消訂閱請求的事件 185 elif opcode == OP_AUTH: # 處理認證請求 186 rest = buffer(data, 0) 187 ident, hash = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):] 188 self.auth(ident, hash) # 認證函數 189 if self.delay: 190 return 191 192 except BadClient: 193 self.conn.close() # 關閉客戶端與服務器的連接 194 logging.warn('Disconnecting bad client: {0}'.format(self.addr)) 195 def forward(self, data): 196 self.conn.write(self.msghdr(OP_PUBLISH, data)) 197 198 def error(self, emsg): 199 self.conn.write(self.msgerror(emsg)) 200 201 def msgerror(self, emsg): 202 return self.msghdr(OP_ERROR, emsg) 203 204 def msginfo(self): 205 return self.msghdr(OP_INFO, '{0}{1}{2}'.format(chr(len(FBNAME)%0xff), FBNAME, self.rand)) 206 207 def msghdr(self, op, data): # 對消息進行封包處理的函數 208 return struct.pack('!iB', 5+len(data), op) + data 209 210 def msgpublish(self, ident, chan, data): # 發布消息預處理函數 211 return self.msghdr(OP_PUBLISH, struct.pack('!B', len(ident)) + ident + struct.pack('!B', len(chan)) + chan + data) 212 213 def publish(self, ident, chan, data): # 發布消息函數 214 self.conn.write(self.msgpublish(ident, chan, data)) 215 216 class FeedBroker(object): 217 def __init__(self): 218 self.ready = False 219 220 self.db = None 221 self.initdb() # 初始化mongodb數據庫 222 223 self.listener = listenplain(host=FBIP, port=FBPORT) # hpfeeds server 開始監聽端口,返回的listener是一個監聽事件 224 self.listener._on('close', self._lclose) # 為事件關聯函數 225 self.listener._on('connection', self._newconn) # 有新的client連接則觸發該事件 226 227 self.connections = set() # 連接的client集合 228 self.subscribermap = collections.defaultdict(list) 229 self.conn2chans = collections.defaultdict(list) 230 231 def initdb(self): 232 self.db = MongoConn(MONGOIP, MONGOPORT) # 連接mongodb 233 self.db._on('ready', self._dbready) # 關聯事件和回調函數 234 self.db._on('close', self._dbclose) 235 236 def _dbready(self): 237 self.ready = True 238 logging.info('Database ready.') 239 240 def _dbclose(self, e): 241 logging.critical('Database connection closed ({0}). Exiting.'.format(e)) 242 unloop() 243 244 def _lclose(self, e): 245 logging.critical('Listener closed ({0}). Exiting.'.format(e)) 246 unloop() 247 248 def _newconn(self, c, addr): # client請求連接server的處理函數 249 logging.debug('Connection from {0}.'.format(addr)) 250 fc = FeedConn(c, addr, self.db) # 處理client的各種類型的請求的監聽事件 251 self.connections.add(fc) 252 fc._on('close', self._connclose) # 為fc關聯事件和回調函數 253 fc._on('subscribe', self._subscribe) 254 fc._on('unsubscribe', self._unsubscribe) 255 fc._on('publish', self._publish) 256 257 def _connclose(self, c): # 關閉server與client連接 258 self.connections.remove(c) 259 for chan in self.conn2chans[c]: 260 self.subscribermap[chan].remove(c) 261 for ident in c.idents: 262 self._brokerchan(c, chan, ident, 0) 263 def _publish(self, c, chan, data): 264 logging.debug('broker publish to {0} by {1}'.format(chan, c.addr)) 265 for c2 in self.subscribermap[chan]: # 該頻道中的訂閱者 266 if c2 == c: continue # 把發布者本身除外 267 c2.forward(data) # 向該頻道的所有訂閱者推送要發布的數據 268 269 def _subscribe(self, c, chan, ident): # 訂閱請求的處理僅把訂閱者添加到頻道中,然后觸發推送數據的循環 270 logging.debug('broker subscribe to {0} by {2} @ {1}'.format(chan, c.addr, ident)) 271 self.subscribermap[chan].append(c) 272 self.conn2chans[c].append(chan) 273 self._brokerchan(c, chan, ident, 1) 274 275 def _unsubscribe(self, c, chan, ident): # 某個客戶端取消對某個頻道的訂閱 276 logging.debug('broker unsubscribe to {0} by {1}'.format(chan, c.addr)) 277 self.subscribermap[chan].remove(c) 278 self.conn2chans[c].remove(chan) 279 self._brokerchan(c, chan, ident, 0) 280 281 def _brokerchan(self, c, chan, ident, subscribe=0): # 觸發推送數據循環 282 data = 'join' if subscribe else 'leave' 283 if self.subscribermap[chan+'..broker']: 284 for c2 in self.subscribermap[chan+'..broker']: 285 if c2 == c: continue 286 c2.publish(ident, chan+'..broker', data) 287 288 def main(): 289 fb = FeedBroker() 290 291 loop() # 啟動事件監聽循環 292 return 0 293 294 if __name__ == '__main__': 295 sys.exit(main())
2. hpfeeds client源碼
Hpfeeds client的工作方式,與server成功建立連接后,開始相應的publish/subscribe操作。如果是做為訂閱者,則會與server一直保持連接狀態,不斷讀取訂閱頻道中的內容;如果是作為發布者,則每次推送完數據后,不管訂閱者有沒有收到信息,都立刻關閉與server的連接。
1 #!/usr/bin/env python 2 3 import sys 4 import optparse # optparse模塊用於處理命令行參數 5 import datetime 6 import logging 7 import string 8 9 import hpfeeds 10 11 def log(msg): 12 print '[feedcli] {0}'.format(msg) 13 14 def main(opts, action, pubdata=None): 15 outfd = None 16 if opts.output: 17 try: outfd = open(opts.output, 'a') 18 except: 19 log('could not open output file for message log.') 20 return 1 21 else: 22 outfd = sys.stdout 23 24 try: hpc = hpfeeds.new(opts.host, opts.port, opts.ident, opts.secret, certfile=opts.certfile) 25 # 連接hpfeeds server,返回值hpc為hpfeeds模塊中HPC類對象, 26 # 如果client與server成功連接,並認證成功,則程序繼續往后執行;否則拋出異常,程序退出 27 except hpfeeds.FeedException, e: 28 log('Error: {0}'.format(e)) 29 return 1 30 31 log('connected to {0}'.format(hpc.brokername)) 32 33 if action == 'subscribe': # 訂閱請求 34 def on_message(ident, chan, payload): # 顯示記錄收到的訂閱信息 35 if [i for i in payload[:20] if i not in string.printable]: 36 log('publish to {0} by {1}: {2}'.format(chan, ident, payload[:20].encode('hex') + '...')) 37 else: 38 log('publish to {0} by {1}: {2}'.format(chan, ident, payload)) 39 40 def on_error(payload): # 記錄錯誤信息 41 log('Error message from broker: {0}'.format(payload)) 42 hpc.stop() # 停止循環 43 44 hpc.subscribe(opts.channels) 45 try: hpc.run(on_message, on_error) # 接收server推送過來的數據,調用on_message(),on_error()進行處理 46 except hpfeeds.FeedException, e: # 拋出異常,程序退出 47 log('Error: {0}'.format(e)) 48 return 1 49 50 elif action == 'publish': # 發布請求 51 hpc.publish(opts.channels, pubdata) # 推送數據 52 emsg = hpc.wait() # 若推送成功,返回None;否則,返回其它值 53 if emsg: print 'got error from server:', emsg 54 55 elif action == 'sendfile': 56 pubfile = open(pubdata, 'rb').read() 57 hpc.publish(opts.channels, pubfile) 58 59 log('closing connection.') 60 hpc.close() 61 62 return 0 63 64 def opts(): # 獲取命令行參數 65 usage = "usage: %prog -i ident -s secret --host host -p port -c channel1 [-c channel2, ...] <action> [<data>]" 66 parser = optparse.OptionParser(usage=usage) 67 parser.add_option("-c", "--chan", # 定義命令行參數 68 action="append", dest='channels', nargs=1, type='string', 69 help="channel (can be used multiple times)") 70 parser.add_option("-i", "--ident", 71 action="store", dest='ident', nargs=1, type='string', 72 help="authkey identifier") 73 parser.add_option("-s", "--secret", 74 action="store", dest='secret', nargs=1, type='string', 75 help="authkey secret") 76 parser.add_option("--host", 77 action="store", dest='host', nargs=1, type='string', 78 help="broker host") 79 parser.add_option("-p", "--port", 80 action="store", dest='port', nargs=1, type='int', 81 help="broker port") 82 parser.add_option("-o", "--output", 83 action="store", dest='output', nargs=1, type='string', 84 help="publish log filename") 85 parser.add_option("--certfile", 86 action="store", dest='certfile', nargs=1, type='string', 87 help="certfile for ssl verification (CA)", default=None) 88 parser.add_option("--debug", 89 action="store_const", dest='debug', 90 help="enable debug log output", default=False, const=True) 91 92 options, args = parser.parse_args() # 定義好所有的命令行參數,調用 parse_args()來解析程序的命令行 93 94 if len(args) < 1: 95 parser.error('You need to give "subscribe" or "publish" as <action>.') 96 if args[0] not in ['subscribe', 'publish', 'sendfile']: 97 parser.error('You need to give "subscribe" or "publish" as <action>.') 98 if options.debug: 99 logging.basicConfig(level=logging.DEBUG) 100 else: 101 logging.basicConfig(level=logging.CRITICAL) 102 103 action = args[0] 104 data = None 105 if action == 'publish': 106 data = ' '.join(args[1:]) 107 elif action == 'sendfile': 108 data = ' '.join(args[1:]) 109 110 return options, action, data 111 112 if __name__ == '__main__': 113 options, action, data = opts() # 獲取命令行參數 114 try: 115 sys.exit(main(options, action, pubdata=data)) # 從main()函數開始執行 116 except KeyboardInterrupt: 117 sys.exit(0)