通信協議TLV的介紹及在python下的代碼實現及仿真


 TLV協議是一種通訊協議,一般將數據封裝成TLV的形式,即Tag,Length,Value。協議就是指通信雙方對數據傳輸控制的一種規定,規定了數據格式,同步方式,傳送速度,傳送步驟的問題作出統一的規定。可以理解為兩個節點之間為了協同工作,協商一定的規則和約定。例如我們會規定字節序,各個字段類型等。

TLV 是一種可變的格式,其中:

  • T 可以理解為 Tag 或 Type ,用於標識標簽或者編碼格式信息;
  • L 定義數值的長度;
  • V 表示實際的數值。

T 和 L 的長度固定,一般是2或4個字節,V 的長度由 Length 指定。

圖例幀格式如下所示:

 

由於用到這塊,我就自己弄了一個python下的仿真代碼,這里就給大家demo一下了:

 服務端:

  

import socket
import threading
import pickle

import time
from TLV import *
# 定義保存所有socket的列表
socket_list = []
# 創建socket對象
ss = socket.socket()
# 將socket綁定到本機IP和端口
ss.bind(('localhost', 2333))
# 服務端開始監聽來自客戶端的連接
ss.listen()
tlv = TLV(t_ext=7, l_ext=7)

def server_target(s):
    try:
        # 采用循環不斷地從socket中讀取客戶端發送過來的數據
        while True:
            line = input()
            if line is None or line == 'exit':
                break

            time.sleep(2)
            tlv.add(8,line)
            data = pickle.dumps(tlv)
            s.send(data)
    except Exception:
        print(Exception.with_traceback())
while True:
    # 此行代碼會阻塞,將一直等待別人的連接
    s, addr = ss.accept()
    #socket_list.append(s)
    # 每當客戶端連接后啟動一個線程為該客戶端服務
    threading.Thread(target=server_target, args=(s, )).start()

 

   客戶端:

 

import socket
import threading
import pickle
from TLV import *
# 創建socket對象
s = socket.socket()
# 連接遠程主機
s.connect(('localhost', 2333))

def read_from_server(s):
    try:
        data = pickle.loads(s.recv(2048))
        # test
        tlvp = TLVParser(data.buffer, t_ext=7, l_ext=7)
        for avp in tlvp.parse():
            print("%d(%d): %s" % (avp["type"], avp["length"], avp["value"]))
        # return s.recv(2048).decode('utf-8')
        return tlvp
    # 如果捕獲到異常,則表明該socket對應的客戶端已經關閉
    except:
        # 刪除該socket
        socket_list.remove(s)   #

def read_server(s):
    try:
        while True:
                contend = read_from_server(s)
                if contend is None:
                        break
    except:
        print(Exception.with_traceback())

# 客戶端啟動線程不斷地讀取來自服務器的數據
threading.Thread(target=read_server, args=(s, )).start()   #

 

 TLV的實現:

 

from scapy.all import *

class TLVError(Exception):
    pass


class TLV:
    def __init__(self, tl_in_l=False, t_ext=0, l_ext=0):
        self.buffer = ""
        self.tl_in_l = tl_in_l
        self.t_ext = t_ext
        self.l_ext = l_ext


    def _int(self, i, ext):
        maxi = 1<<8
        if ext > 0:
            maxi = (1 << ext)
        holdstr = ""
        holder = i 
        extend = 0 
        count = 1 
        while holder >= maxi:
            count += 1
            newnum = (holder & (maxi - 1)) 
            holdstr = chr(newnum | extend) + holdstr
            extend = maxi
            holder /= maxi

        holdstr = chr(holder | extend) + holdstr
        return holdstr

    def _t(self, t):
        if self.t_ext == 0 and t > 256:
            raise TLVError("type > 256 and no extension bit set")
        return self._int(t, self.t_ext)


    def _l(self, l):
        if self.l_ext == 0 and l > 256:
            raise TLVError("length > 256 and no extension bit set")
        return self._int(l, self.l_ext)


    def add(self, t, v, l=None):
        self.buffer += self._t(t)
        length = 0 if l is None else l

        if self.tl_in_l:
            length += t

        if l is None:
            length += len(v)

        self.buffer += self._l(length)
        self.buffer += v


    def __str__(self):
        return self.buffer

    def __repr__(self):
        return self.buffer


class TLVParser:
    def __init__(self, buffer, tl_in_l=False, t_ext=0, l_ext=0):
        self.buffer = buffer
        self.tl_in_l = tl_in_l
        self.t_ext = t_ext
        self.l_ext = l_ext
        self.offset = 0


    def _get_i(self, i_ext):
        try:
            byte = ord(self.buffer[self.offset])
        except IndexError:
            raise TLVError("Not enough data")
        ext = 1 << (i_ext if i_ext > 0 else 8)
        i = 0
        while byte & ext:
            i += (byte & (ext - 1))
            i <<= i_ext
            self.offset += 1
            try:
                byte = ord(self.buffer[self.offset])
            except IndexError:
                raise TLVError("Not enough data")
        i += byte
        self.offset += 1
        return i


    def _get_tlv(self):
        t = self._get_i(self.t_ext)
        l = self._get_i(self.l_ext)
        if self.offset + l > len(self.buffer):
            raise TLVError("Buffer not long enough to encompass TLV")
        v = self.buffer[self.offset:self.offset+l]
        self.offset += l
        return (t, l, v)


    def parse(self):
        while self.offset < len(self.buffer):
            t, l, v = self._get_tlv()
            yield {
                "type": t,
                "length": l,
                "value": v,
            }

# Test/example program for building TLVs and parsing the TLVs
if __name__ == "__main__":
    tlv = TLV(t_ext=7, l_ext=7)
    tlv.add(10, "Foobar")
    tlv.add(16, "Bladibla")
    # hexdump(tlv)
    tlvp = TLVParser(tlv.buffer, t_ext=7, l_ext=7)
    for avp in tlvp.parse():
        print ("%d(%d): %s" % (avp["type"], avp["length"], avp["value"]))

  具體的代碼運行結果就不貼了,對懂得python的同學來說,這個很簡單的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM