hpfeeds協議解析


一. hpfeeds協議簡介

  hpfeeds是一個輕量級的驗證發布-訂閱協議(authenticated publish-subscribe protocol)

發布-訂閱協議:發布/訂閱協議定義了一種一對多的依賴關系,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知所有訂閱者對象,使它們能夠自動更新自己的狀態。它是為了解決這樣一種情況的發生,一個對象的改變需要同時改變其他對象,而且它不知道具體有多少對象需要改變。這就類似於發傳單,目標發送通知,沒有指定特定的對象,通知會自動傳播,觀察者自己決定是否需要看傳單,發送者對於傳單是否被看一無所知。

  hpfeeds把不同的數據用頻道來划分,支持傳送任意的二進制數據。由頻道的使用者決定傳送數據結構的形式。通道的驗證通過Authkey來完成,它由兩部分組成:identsecret,相當於身份和密碼。把密碼和每次連接的一個隨機數進行hash摘要算法然后一起發送給服務器,這樣保證不會被竊聽,所以它可以運行在SSLTLS下。

  整個協議實現了三方的功能獨立。使用這種方式降低了應用與業務邏輯之間的耦合,統一一個對外的發布接口,只需要關心監聽的類型,不關心監聽的具體處理人。頻道的發布者只管發,不管訂閱者有沒有收到,很方便的建立一種一對多的依賴關系。在當一個對象的改變需要同時改變其他對象,而且它不知道具體有多少對象需要改變時,就可以使用訂閱發布模式。

 

二. hpfeeds的原理

  hpfeeds協議通過以下幾部分實現:hpfeeds serverhpfeeds clientmongodb數據庫。

1. hpfeeds server: 

  • 負責為每個client的連接生成一個連接標志;
  • 檢查請求連接的clientidsha1(nonce+Authkey)
  • 檢查client的請求類型,發布還是接收;

2. hpfeeds client: 

  每個hpfeeds client都即可以作為發布者也可以作為訂閱者,發布者和訂閱者並不要求必須同時存在。

3. Mongodb: 

  mongodb數據庫用來存儲每個clientidsecret,並且每當有client請求連接server時,server都會從mangodb中取出該client注冊時的idsecret進行對比。 若對比一致則認證通過,client可以和server正常建立連接;若不一致則clientserver建立連接失敗。

4. client和server的認證過程:

  Clientserver的認證及發布/訂閱過程如下圖1所示:  

     

hpfeeds協議建立連接及通信的過程: 

  1. Client發起連接請求;
  2. server為每個client的連接生成一個連接標志,並將其發送給請求連接的client
  3. client發送自己的idsha1(nonce+Authkey)server進行認證;
  4. servermongodb中取出相應的信息檢查驗證,若認證通過,保持連接並執行后續操作。否則,斷開連接;
  5. client發起publish/subscribe請求;
  6. server檢查client請求消息的類型,發布/訂閱。

 

三. hpfeeds的消息格式

1. Wire Protocol: 

  每個hpfeeds協議消息都攜帶了一個固定格式的消息頭,有兩個參數:消息總長度和請求類型。如下代碼所示。

1 struct MsgHeader {
2      unit32_t_messageLength;           // total message size, including this request type
3      unit8_t_opCode;
4 };

  請求類型有以下幾種:

  • error(0): errormessage
  • info(1): server name, nonce  
  • auth (2): client id, sha1(nonce+authkey)
  • publish (3): client id, channelname, payload 
  • subscribe (4): client id, channelname

  一個完整的發布類型的消息如下圖所示,由消息頭、client_id的長度、client_idchannelname的長度、channelname、傳輸內容payloadpayload可以是任意類型的二進制數據。

 

四. hpfeeds源碼解析

  Hpfeeds協議serverclient的通訊主要也是使用TCP套接字的方式。

  Hpfeeds server采用了事件驅動的方式來處理client的請求。這樣做是為了應對高連接數高吞吐量的client請求,使用這種方法可以同時接收數百、數千甚至數萬個事件,無論它們是內部請求還是網絡連接,都可以高效地處理它們的操作。同時還能夠極大的降低資源占用,增大服務接待能力,並提高網絡傳輸效率。

  Hpfeeds servermongodb的連接及數據交互並沒有使用Python自帶的pymongo模塊,而是使用了自己編寫的一個基於事件驅動的MongoConn模塊。這樣做的目的也是為了處理高連接數的client請求。下面主要對hpfeeds的server和client的源碼進行解析。

1. hpfeeds server 源碼

  Hpfeeds server的工作方式,首先連接mongodb數據庫,監聽hpfeeds server的服務端口,設置事件監聽器,關聯相應處理函數,將事件監聽器加入事件循環,啟動事件循環進行監聽。如果有client請求來,則會觸發相應的事件,調用與事件相關聯的函數進行處理操作。Hpfeeds server的主程序代碼如下。

  1 #!/usr/bin/env python
  2 
  3 
  4 import sys
  5 
  6 import struct
  7 import hashlib
  8 import collections
  9 import random
 10 
 11 import logging
 12 logging.basicConfig(level=logging.INFO)
 13 
 14 from evnet import loop, unloop, listenplain, EventGen       # 用於實現事件循環的模塊
 15 from evnet.mongodb import MongoConn                         
 16 # 注意:Python本身有對mongodb進行操作的模塊,但在hpfeeds server中沒有使用,
 17 # 這里它自己實現了一個對mongodb進行操作的模塊MongoConn,為了實現使用事件循
 18 # 的方式來對數據庫進行操作
 19 
 20 FBIP = '0.0.0.0'        # hpfeeds server監聽的地址和端口號
 21 FBPORT = 10000
 22 FBNAME = '@hp2'
 23 MONGOIP = '127.0.0.1'
 24 MONGOPORT = 27017
 25 
 26 OP_ERROR    = 0
 27 OP_INFO     = 1
 28 OP_AUTH     = 2
 29 OP_PUBLISH  = 3
 30 OP_SUBSCRIBE    = 4
 31 OP_UNSUBSCRIBE  = 5
 32 
 33 MAXBUF = 10* (1024**2)
 34 SIZES = {
 35     OP_ERROR: 5+MAXBUF,
 36     OP_INFO: 5+256+20,
 37     OP_AUTH: 5+256+20,
 38     OP_PUBLISH: 5+MAXBUF,
 39     OP_SUBSCRIBE: 5+256*2,
 40     OP_UNSUBSCRIBE: 5+256*2,
 41 }
 42 
 43 class BadClient(Exception):
 44     pass
 45 
 46 class FeedUnpack(object):            # 對client傳來的數據進行解碼
 47     def __init__(self):
 48         self.buf = bytearray()
 49     def __iter__(self):
 50         return self
 51     def next(self):
 52         return self.unpack()
 53     def feed(self, data):           # 將client傳來的數據存入self.buf
 54         self.buf.extend(data)       
 55     def unpack(self):
 56         if len(self.buf) < 5:       # 如果self.buf的總長度小於5,說明請求消息為空
 57                                     # 因為client各種請求類型的消息長度都是大於5的
 58             raise StopIteration('No message.')
 59 
 60         ml, opcode = struct.unpack('!iB', buffer(self.buf,0,5))
 61                                                 # 解碼出ml和opcode
 62         if ml > SIZES.get(opcode, MAXBUF):      # ml為hpfeeds消息的總長度
 63             raise BadClient('Not respecting MAXBUF.')
 64 
 65         if len(self.buf) < ml:      # self.buf中的數據長度小於該條消息的總長度,拋出異常
 66             raise StopIteration('No message.')
 67         
 68         data = bytearray(buffer(self.buf, 5, ml-5))
 69         del self.buf[:ml]       # 刪除self.buf中的數據
 70         return opcode, data     
 71         # data中包含了len(client_id),client_id,length(channelname), channelname,payload
 72 
 73 
 74 class FeedConn(EventGen):
 75     def __init__(self, conn, addr, db):
 76         EventGen.__init__(self)
 77         self.conn = conn
 78         self.addr = addr
 79         self.db = db
 80         self.pubchans = set()
 81         self.subchans = set()
 82         self.idents = set()
 83         self.delay = False
 84 
 85         self.rand = struct.pack('<I', random.randint(2**31,2**32-1))    # 產生一個隨機數
 86         self.fu = FeedUnpack()
 87 
 88         conn._on('read', self.io_in)
 89         conn._on('close', self.closed)
 90 
 91         self.sendinfo()
 92 
 93     def sendinfo(self):
 94         self.conn.write(self.msginfo())
 95 
 96     def auth(self, ident, hash):    # server和client的認證函數
 97         p = self.db.query('hpfeeds.auth_key', {'identifier': str(ident)}, limit=1)
 98         # 查詢mongodb中的數據,返回的p為一個Promise()對象
 99         p._when(self.checkauth, hash)   # 調用checkauth函數對client進行認證
100 
101         def dbexc(e):       # mongodb查詢異常處理函數
102             logging.critical('Database query exception. {0}'.format(e))
103             self.error('Database query exception.')
104         
105         p._except(dbexc)    # 如果出現異常則執行響應的處理函數
106 
107         self.delay = True
108 
109     def checkauth(self, r, hash):   # server與client的認證處理函數
110         if len(r) > 0:              # r是self._result
111             akobj = r[0]            
112             akhash = hashlib.sha1('{0}{1}'.format(self.rand, akobj['secret'])).digest()
113             if akhash == hash:      # 將數據庫中取得的secret與self.rand進行hash摘要算法進行對比
114                 self.pubchans.update(akobj.get('publish', []))  # 更新發布頻道
115                 self.subchans.update(akobj.get('subscribe', []))# 更新訂閱頻道
116                 self.idents.add(akobj['identifier'])        # 將認證成功的client_id添加到self.idents
117                 logging.info('Auth success by {0}.'.format(akobj['identifier']))
118             else:
119                 self.error('authfail.')
120                 logging.info('Auth failure by {0}.'.format(akobj['identifier']))
121         else:
122             self.error('authfail.')
123         self.delay = False
124         self.io_in(b'')
125     
126     def closed(self, reason):
127         logging.debug('Connection closed, {0}'.format(reason))
128         self._event('close', self)
129 
130     def may_publish(self, chan):
131         return chan in self.pubchans
132 
133     def may_subscribe(self, chan):
134         return chan in self.subchans
135 
136     def io_in(self, data):      # 傳送請求發布和訂閱的數據函數
137         self.fu.feed(data)      # 數據存入self.buf
138         if self.delay:          # 經FeedUnpack的實例處理后的數據為opcode, data
139             return              # data中包含了len(client_id),client_id,length(channelname), channelname,payload
140         try:
141             for opcode, data in self.fu:
142                 if opcode == OP_PUBLISH:                        # 處理發布請求
143                     rest = buffer(data, 0)                      # 數據存入buffer
144                     ident, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
145                     chan, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
146                                                                 # 解碼出發布請求包中的數據
147                     if not ident in self.idents:
148                         self.error('identfail.')
149                         continue
150 
151                     if not self.may_publish(chan):
152                         self.error('accessfail.')
153                         continue
154                     
155                     self._event('publish', self, chan, data)    # 觸發發布請求的處理事件
156                 elif opcode == OP_SUBSCRIBE:                    # 處理訂閱請求
157                     rest = buffer(data, 0)
158                     ident, chan = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
159 
160                     if not ident in self.idents:
161                         self.error('identfail.')
162                         continue
163 
164                     checkchan = chan
165                     if chan.endswith('..broker'): checkchan = chan.rsplit('..broker', 1)[0]
166 
167                     if not self.may_subscribe(checkchan):
168                         self.error('accessfail.')
169                         continue
170 
171                     self._event('subscribe', self, chan, ident) # 觸發訂閱請求的處理事件
172                 elif opcode == OP_UNSUBSCRIBE:                  # 處理取消訂閱請求
173                     rest = buffer(data, 0)
174                     ident, chan = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
175 
176                     if not ident in self.idents:
177                         self.error('identfail.')
178                         continue
179 
180                     if not self.may_subscribe(chan):
181                         self.error('accessfail.')
182                         continue
183 
184                     self._event('unsubscribe', self, chan, ident)# 觸發取消訂閱請求的事件
185                 elif opcode == OP_AUTH:                         # 處理認證請求
186                     rest = buffer(data, 0)
187                     ident, hash = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
188                     self.auth(ident, hash)                      # 認證函數
189                     if self.delay:
190                         return
191 
192         except BadClient:
193             self.conn.close()           # 關閉客戶端與服務器的連接
194             logging.warn('Disconnecting bad client: {0}'.format(self.addr))
195     def forward(self, data):
196         self.conn.write(self.msghdr(OP_PUBLISH, data))
197 
198     def error(self, emsg):
199         self.conn.write(self.msgerror(emsg))
200 
201     def msgerror(self, emsg):
202         return self.msghdr(OP_ERROR, emsg)
203 
204     def msginfo(self):
205         return self.msghdr(OP_INFO, '{0}{1}{2}'.format(chr(len(FBNAME)%0xff), FBNAME, self.rand))
206 
207     def msghdr(self, op, data):                 # 對消息進行封包處理的函數
208         return struct.pack('!iB', 5+len(data), op) + data
209 
210     def msgpublish(self, ident, chan, data):    # 發布消息預處理函數
211         return self.msghdr(OP_PUBLISH, struct.pack('!B', len(ident)) + ident + struct.pack('!B', len(chan)) + chan + data)
212 
213     def publish(self, ident, chan, data):       # 發布消息函數
214         self.conn.write(self.msgpublish(ident, chan, data))
215 
216 class FeedBroker(object):
217     def __init__(self):
218         self.ready = False
219 
220         self.db = None
221         self.initdb()           # 初始化mongodb數據庫
222 
223         self.listener = listenplain(host=FBIP, port=FBPORT)     # hpfeeds server 開始監聽端口,返回的listener是一個監聽事件
224         self.listener._on('close', self._lclose)                # 為事件關聯函數
225         self.listener._on('connection', self._newconn)          # 有新的client連接則觸發該事件
226 
227         self.connections = set()                                # 連接的client集合
228         self.subscribermap = collections.defaultdict(list)      
229         self.conn2chans = collections.defaultdict(list)
230 
231     def initdb(self):
232         self.db = MongoConn(MONGOIP, MONGOPORT)     # 連接mongodb
233         self.db._on('ready', self._dbready)         # 關聯事件和回調函數
234         self.db._on('close', self._dbclose)
235 
236     def _dbready(self):
237         self.ready = True
238         logging.info('Database ready.')
239 
240     def _dbclose(self, e):
241         logging.critical('Database connection closed ({0}). Exiting.'.format(e))
242         unloop()
243 
244     def _lclose(self, e):
245         logging.critical('Listener closed ({0}). Exiting.'.format(e))
246         unloop()
247 
248     def _newconn(self, c, addr):                                # client請求連接server的處理函數
249         logging.debug('Connection from {0}.'.format(addr))
250         fc = FeedConn(c, addr, self.db)                         # 處理client的各種類型的請求的監聽事件
251         self.connections.add(fc)                                
252         fc._on('close', self._connclose)                        # 為fc關聯事件和回調函數
253         fc._on('subscribe', self._subscribe)
254         fc._on('unsubscribe', self._unsubscribe)
255         fc._on('publish', self._publish)
256 
257     def _connclose(self, c):                                    # 關閉server與client連接
258         self.connections.remove(c)
259         for chan in self.conn2chans[c]:
260             self.subscribermap[chan].remove(c)
261             for ident in c.idents:
262                 self._brokerchan(c, chan, ident, 0)
263     def _publish(self, c, chan, data):
264         logging.debug('broker publish to {0} by {1}'.format(chan, c.addr))
265         for c2 in self.subscribermap[chan]:             # 該頻道中的訂閱者
266             if c2 == c: continue                        # 把發布者本身除外
267             c2.forward(data)                            # 向該頻道的所有訂閱者推送要發布的數據
268         
269     def _subscribe(self, c, chan, ident):               # 訂閱請求的處理僅把訂閱者添加到頻道中,然后觸發推送數據的循環
270         logging.debug('broker subscribe to {0} by {2} @ {1}'.format(chan, c.addr, ident))
271         self.subscribermap[chan].append(c)
272         self.conn2chans[c].append(chan)
273         self._brokerchan(c, chan, ident, 1)
274     
275     def _unsubscribe(self, c, chan, ident):             # 某個客戶端取消對某個頻道的訂閱
276         logging.debug('broker unsubscribe to {0} by {1}'.format(chan, c.addr))
277         self.subscribermap[chan].remove(c)
278         self.conn2chans[c].remove(chan)
279         self._brokerchan(c, chan, ident, 0)
280 
281     def _brokerchan(self, c, chan, ident, subscribe=0):     # 觸發推送數據循環
282         data = 'join' if subscribe else 'leave'
283         if self.subscribermap[chan+'..broker']:
284             for c2 in self.subscribermap[chan+'..broker']:
285                 if c2 == c: continue
286                 c2.publish(ident, chan+'..broker', data)
287 
288 def main():
289     fb = FeedBroker()
290 
291     loop()                  # 啟動事件監聽循環
292     return 0
293  
294 if __name__ == '__main__':
295     sys.exit(main())

2. hpfeeds client源碼

  Hpfeeds client的工作方式,與server成功建立連接后,開始相應的publish/subscribe操作。如果是做為訂閱者,則會與server一直保持連接狀態,不斷讀取訂閱頻道中的內容;如果是作為發布者,則每次推送完數據后,不管訂閱者有沒有收到信息,都立刻關閉與server的連接。 

  1 #!/usr/bin/env python
  2 
  3 import sys
  4 import optparse     # optparse模塊用於處理命令行參數
  5 import datetime
  6 import logging
  7 import string
  8 
  9 import hpfeeds
 10 
 11 def log(msg):
 12     print '[feedcli] {0}'.format(msg)
 13 
 14 def main(opts, action, pubdata=None):
 15     outfd = None
 16     if opts.output:
 17         try: outfd = open(opts.output, 'a')
 18         except:
 19             log('could not open output file for message log.')
 20             return 1
 21     else:
 22         outfd = sys.stdout
 23 
 24     try: hpc = hpfeeds.new(opts.host, opts.port, opts.ident, opts.secret, certfile=opts.certfile)
 25     # 連接hpfeeds server,返回值hpc為hpfeeds模塊中HPC類對象,
 26     # 如果client與server成功連接,並認證成功,則程序繼續往后執行;否則拋出異常,程序退出
 27     except hpfeeds.FeedException, e:
 28         log('Error: {0}'.format(e))
 29         return 1
 30     
 31     log('connected to {0}'.format(hpc.brokername))
 32 
 33     if action == 'subscribe':                           # 訂閱請求
 34         def on_message(ident, chan, payload):           # 顯示記錄收到的訂閱信息
 35             if [i for i in payload[:20] if i not in string.printable]:
 36                 log('publish to {0} by {1}: {2}'.format(chan, ident, payload[:20].encode('hex') + '...'))
 37             else:
 38                 log('publish to {0} by {1}: {2}'.format(chan, ident, payload))
 39 
 40         def on_error(payload):                          # 記錄錯誤信息
 41             log('Error message from broker: {0}'.format(payload))
 42             hpc.stop()                                  # 停止循環
 43 
 44         hpc.subscribe(opts.channels)
 45         try: hpc.run(on_message, on_error)      # 接收server推送過來的數據,調用on_message(),on_error()進行處理
 46         except hpfeeds.FeedException, e:        # 拋出異常,程序退出
 47             log('Error: {0}'.format(e))
 48             return 1
 49 
 50     elif action == 'publish':                   # 發布請求
 51         hpc.publish(opts.channels, pubdata)     # 推送數據
 52         emsg = hpc.wait()                       # 若推送成功,返回None;否則,返回其它值
 53         if emsg: print 'got error from server:', emsg
 54 
 55     elif action == 'sendfile':
 56         pubfile = open(pubdata, 'rb').read()
 57         hpc.publish(opts.channels, pubfile)
 58 
 59     log('closing connection.')
 60     hpc.close()
 61 
 62     return 0
 63 
 64 def opts():                         # 獲取命令行參數
 65     usage = "usage: %prog -i ident -s secret --host host -p port -c channel1 [-c channel2, ...] <action> [<data>]"
 66     parser = optparse.OptionParser(usage=usage)
 67     parser.add_option("-c", "--chan",                       # 定義命令行參數
 68         action="append", dest='channels', nargs=1, type='string',
 69         help="channel (can be used multiple times)")
 70     parser.add_option("-i", "--ident",
 71         action="store", dest='ident', nargs=1, type='string',
 72         help="authkey identifier")
 73     parser.add_option("-s", "--secret",
 74         action="store", dest='secret', nargs=1, type='string',
 75         help="authkey secret")
 76     parser.add_option("--host",
 77         action="store", dest='host', nargs=1, type='string',
 78         help="broker host")
 79     parser.add_option("-p", "--port",
 80         action="store", dest='port', nargs=1, type='int',
 81         help="broker port")
 82     parser.add_option("-o", "--output",
 83         action="store", dest='output', nargs=1, type='string',
 84         help="publish log filename")
 85     parser.add_option("--certfile",
 86         action="store", dest='certfile', nargs=1, type='string',
 87         help="certfile for ssl verification (CA)", default=None)
 88     parser.add_option("--debug",
 89         action="store_const", dest='debug',
 90         help="enable debug log output", default=False, const=True)
 91 
 92     options, args = parser.parse_args()                     # 定義好所有的命令行參數,調用 parse_args()來解析程序的命令行
 93 
 94     if len(args) < 1:
 95         parser.error('You need to give "subscribe" or "publish" as <action>.')
 96     if args[0] not in ['subscribe', 'publish', 'sendfile']:
 97         parser.error('You need to give "subscribe" or "publish" as <action>.')
 98     if options.debug:
 99         logging.basicConfig(level=logging.DEBUG)
100     else:
101         logging.basicConfig(level=logging.CRITICAL)
102 
103     action = args[0]
104     data = None
105     if action == 'publish':
106         data = ' '.join(args[1:])
107     elif action == 'sendfile':
108         data = ' '.join(args[1:])
109 
110     return options, action, data
111 
112 if __name__ == '__main__':
113     options, action, data = opts()      # 獲取命令行參數
114     try:
115         sys.exit(main(options, action, pubdata=data))   # 從main()函數開始執行
116     except KeyboardInterrupt:
117         sys.exit(0)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM