TLV協議是一種通訊協議,一般將數據封裝成TLV的形式,即Tag,Length,Value。協議就是指通信雙方對數據傳輸控制的一種規定,規定了數據格式,同步方式,傳送速度,傳送步驟的問題作出統一的規定。可以理解為兩個節點之間為了協同工作,協商一定的規則和約定。例如我們會規定字節序,各個字段類型等。
TLV 是一種可變的格式,其中:
T
可以理解為Tag
或Type
,用於標識標簽或者編碼格式信息;L
定義數值的長度;V
表示實際的數值。
T
和 L
的長度固定,一般是2或4個字節,V
的長度由 Length
指定。
圖例幀格式如下所示:
由於用到這塊,我就自己弄了一個python下的仿真代碼,這里就給大家demo一下了:
服務端:
import socket import threading import pickle import time from TLV import * # 定義保存所有socket的列表 socket_list = [] # 創建socket對象 ss = socket.socket() # 將socket綁定到本機IP和端口 ss.bind(('localhost', 2333)) # 服務端開始監聽來自客戶端的連接 ss.listen() tlv = TLV(t_ext=7, l_ext=7) def server_target(s): try: # 采用循環不斷地從socket中讀取客戶端發送過來的數據 while True: line = input() if line is None or line == 'exit': break time.sleep(2) tlv.add(8,line) data = pickle.dumps(tlv) s.send(data) except Exception: print(Exception.with_traceback()) while True: # 此行代碼會阻塞,將一直等待別人的連接 s, addr = ss.accept() #socket_list.append(s) # 每當客戶端連接后啟動一個線程為該客戶端服務 threading.Thread(target=server_target, args=(s, )).start()
客戶端:
import socket import threading import pickle from TLV import * # 創建socket對象 s = socket.socket() # 連接遠程主機 s.connect(('localhost', 2333)) def read_from_server(s): try: data = pickle.loads(s.recv(2048)) # test tlvp = TLVParser(data.buffer, t_ext=7, l_ext=7) for avp in tlvp.parse(): print("%d(%d): %s" % (avp["type"], avp["length"], avp["value"])) # return s.recv(2048).decode('utf-8') return tlvp # 如果捕獲到異常,則表明該socket對應的客戶端已經關閉 except: # 刪除該socket socket_list.remove(s) # ① def read_server(s): try: while True: contend = read_from_server(s) if contend is None: break except: print(Exception.with_traceback()) # 客戶端啟動線程不斷地讀取來自服務器的數據 threading.Thread(target=read_server, args=(s, )).start() # ①
TLV的實現:
from scapy.all import * class TLVError(Exception): pass class TLV: def __init__(self, tl_in_l=False, t_ext=0, l_ext=0): self.buffer = "" self.tl_in_l = tl_in_l self.t_ext = t_ext self.l_ext = l_ext def _int(self, i, ext): maxi = 1<<8 if ext > 0: maxi = (1 << ext) holdstr = "" holder = i extend = 0 count = 1 while holder >= maxi: count += 1 newnum = (holder & (maxi - 1)) holdstr = chr(newnum | extend) + holdstr extend = maxi holder /= maxi holdstr = chr(holder | extend) + holdstr return holdstr def _t(self, t): if self.t_ext == 0 and t > 256: raise TLVError("type > 256 and no extension bit set") return self._int(t, self.t_ext) def _l(self, l): if self.l_ext == 0 and l > 256: raise TLVError("length > 256 and no extension bit set") return self._int(l, self.l_ext) def add(self, t, v, l=None): self.buffer += self._t(t) length = 0 if l is None else l if self.tl_in_l: length += t if l is None: length += len(v) self.buffer += self._l(length) self.buffer += v def __str__(self): return self.buffer def __repr__(self): return self.buffer class TLVParser: def __init__(self, buffer, tl_in_l=False, t_ext=0, l_ext=0): self.buffer = buffer self.tl_in_l = tl_in_l self.t_ext = t_ext self.l_ext = l_ext self.offset = 0 def _get_i(self, i_ext): try: byte = ord(self.buffer[self.offset]) except IndexError: raise TLVError("Not enough data") ext = 1 << (i_ext if i_ext > 0 else 8) i = 0 while byte & ext: i += (byte & (ext - 1)) i <<= i_ext self.offset += 1 try: byte = ord(self.buffer[self.offset]) except IndexError: raise TLVError("Not enough data") i += byte self.offset += 1 return i def _get_tlv(self): t = self._get_i(self.t_ext) l = self._get_i(self.l_ext) if self.offset + l > len(self.buffer): raise TLVError("Buffer not long enough to encompass TLV") v = self.buffer[self.offset:self.offset+l] self.offset += l return (t, l, v) def parse(self): while self.offset < len(self.buffer): t, l, v = self._get_tlv() yield { "type": t, "length": l, "value": v, } # Test/example program for building TLVs and parsing the TLVs if __name__ == "__main__": tlv = TLV(t_ext=7, l_ext=7) tlv.add(10, "Foobar") tlv.add(16, "Bladibla") # hexdump(tlv) tlvp = TLVParser(tlv.buffer, t_ext=7, l_ext=7) for avp in tlvp.parse(): print ("%d(%d): %s" % (avp["type"], avp["length"], avp["value"]))
具體的代碼運行結果就不貼了,對懂得python的同學來說,這個很簡單的。