python與C結構體之間數據轉換
前言
在實際應用中,可能會遇到直接和C進行二進制字節流協議通信,這時要把數據解包成python數據,如果可能,最好與C定義的結構體完全對應上.
python中有2種方式,可處理二進制數據轉換
- 用ctypes包的
Structure
直接定義結構體 - 用struct包的
pack/unpack
函數組裝轉換
在轉換時一定要注意字節序,這兩種方式都有各自的方法標志字節序.
使用ctypes包
ctypes中有許多C中的操作接口,如sizeof
,memmove
等,也提供近似C結構體的模擬類Structure
,BigEndianStructure
,Union
,顯然的是BigEndianStructure
是網絡字節序(大端),方便直接用於網絡傳輸,Union
和Structure
是主機序(可能是大端,也可能是小端,和本機有關).
Structure/BigEndianStructure使用
from ctypes import *
class SSHead(BigEndianStructure):
_pack_ = 1
_fields_ = [
#(字段名, c類型 )
('nTotalSize', c_uint32),
('nSourceID', c_int32),
('sourceType', c_uint8),
('destType', c_uint8),
('transType', c_uint8),
('nDestID', c_int32),
('nFlag', c_uint8),
('nOptionalLength', c_uint16),
('arrOptional', c_char * 20),
]
def encode(self):
return string_at(addressof(self), sizeof(self))
def decode(self, data):
memmove(addressof(self), data, sizeof(self))
return len(data)
# -------------------
# 使用
sshead = SSHead()
sshead.nSourceID = 20 #省略其他賦值
buf = sshead.encode()
ss = SSHead()
ss.decode(buf)
print(ss.nSourceID)
以上就是一個簡單協議結構體定義,對應的C版本如下
struct SSHead
{
uint32_t nTotalSize;
int32_t nSourceID;
uint8_t sourceType;
uint8_t destType;
uint8_t transType;
int32_t nDestID;
int8_t nFlag;
uint16_t nOptionalLength;
char arrOptional[20];
//簡單模擬python的打包解包
int encode(char* buf, size_t max_len)
{
memmove(buf, this, sizeof(this));
return 0;
}
int decode(char* buf, size_t len)
{
memmove(this, buf, len);
return 0;
}
}
// c中對應的 打包/解包流程(假設本機字節序為大端)
SSHead sshead = {0};
sshead.nSourceID = 20;
char buf[1024];
sshead.encode(buf);
SSHead ss = {0};
ss.decode(buf, sizeof(ss));
其中_pack_ = 1
表示1字節對齊,不然可能會被填充,導致結構體實際所占字節數與表面上的不一樣.
_fields_
定義C結構體中相對應的字段名和類型,C中每種基礎類型在ctypes都有與之對應的類型,如c_uint32
對應uint32_t
,占4個字節.數組就是后面乘以對應的長度即可,如c_uint8 * 20
.另外還支持嵌套定義結構體.在實例化后,字段名會成為成員變量,可直接賦值.
encode
會直接得到該對象的二進制數據,如果不考慮字節序,則與C中相同對象的二進制數據是一樣的
decode
相反,直接解包二進制數據為python數據
這樣python和c就可以直接通過結構體定義協議通信了.
注意
- python中的二進制數據是bytes類型,不是str類型
- 在python3.6及之前的版本,是沒有
BigEndianUnion
類型 - 用來網絡傳輸一定要用
BigEndianStructure
,不然會有字節序問題
缺點
此方法只能適用於結構體固定打解包的情況,如果協議中有大數組,但數組中的數據只有前幾個是有效的,后面都是無效的,一般在打包的時候只打包有效數據,這種情況用Structure
就不合適了.
使用struct包
struct模塊是專門用來處理python與C之間的二進制數據轉換,總共只有幾個函數
下面在原有的SSHead定義中增加2個使用struct打包解包的函數
from ctypes import *
import struct
class SSHead(BigEndianStructure):
_pack_ = 1
_fields_ = [
#(字段名, c類型 )
('nTotalSize', c_uint32),
('nSourceID', c_int32),
('sourceType', c_uint8),
('destType', c_uint8),
('transType', c_uint8),
('nDestID', c_int32),
('nFlag', c_uint8),
('nOptionalLength', c_uint16),
('arrOptional', c_char * 20),
]
def encode(self):
return string_at(addressof(self), sizeof(self))
def decode(self, data):
memmove(addressof(self), data, sizeof(self))
return len(data)
def pack(self):
buffer = struct.pack("!IIBBBIBH20s", self.nTotalSize, self.nSourceID, self.sourceType
, self.destType, self.transType, self.nDestID, self.nFlag, self.nOptionalLength, self.arrOptional)
return buffer
def unpack(self, data):
(self.nTotalSize, self.nSourceID, self.sourceType, self.destType, self.transType, self.nDestID,
self.nFlag, self.nOptionalLength, self.arrOptional) = struct.unpack("!IIBBBIBH20s", data)
# ---------------------------
# 測試
s = SSHead()
s.arrOptional = b'hello'
ss = SSHead()
ss.unpack(s.encode())
print(ss.arrOptional)
pack/unpack的fmt(格式化串)說明
"!IIBBBIBH20B"
:!
表示按照網絡序處理,I
表示后面的第一變量為4字節的int
型,接着的B
表示為下一個變量為1字節的uint8_t
型,以此類推,20s
表示后面是長度20的字節數組
其他參數可參考官方文檔.
缺點
上面的例子中如果使用pakc/unpack
方法,是不用繼承BigEndianStructure
,只需自定義相應字段變量.
可以看到,struct.pack/unpack必須對每個字段代表什么類型,幾個字節進行描述.與Structure
相比,比較靈活,可以自由組合怎么打包,比如在nOptionalLength=0
時,不打包arrOptional
字段.缺點就是,定義pack/unpack
函數時,協議多起來會非常繁瑣且容易出錯.所以最好是自動化生成pack/unpack
函數.
自動化生成pack/unpack
定義結構體成員列表
顯然,我們需要知道結構體成員的變量名和類型,參考Structure
,有如下定義
class BaseCode(object):
_type_map_index_pack_tag = 1
_type_map_index_pack_size = 2
_type_map = {
# C類型:(說明, 編碼標志)
'char': ('int', 'B'),
'uint32_t': ('int', 'I'),
'string': ('str', 'B'),
'int32_t': ('int', 'i'),
'int64_t': ('int', 'q'),
'uint64_t': ('int', 'Q'),
'float': ('float', 'f'),
'double': ('double', 'd'),
}
# 每種基礎類型所占字節數
_ctype_size_map = {'I': 4, 'B': 1, 'i': 4, 'b': 1, 'Q': 8, 'q': 8, 'f': 4, 'd': 8}
_fields_index_ctype = 0
_fields_index_value_name = 1
_fields_index_array_length = 2
# 測試
_fields = [
# (C類型, 變量名)
('uint32_t', 'nUint'),
('string', 'szString', '_Const.enmMaxAccountIDLength'),
('int32_t', 'nInt3'),
('uint32_t', 'nUintArray', 4),
]
按序遍歷_fields中的字段
對_fields中的每個元素,進行編碼,通過變量名可獲得實際變量值,通過C類型利用struct.pack/unpack
可獲得實際編碼
下面是添加的類成員函數encode
def encode(self, nest=1):
data = b''
tmp = b''
debug_log("&" * nest, self.__class__.__name__, "encode struct start :")
for one in self._fields:
debug_log("#" * nest, "encode one element:", one)
ctype = one[self._fields_index_ctype]
value = getattr(self, one[self._fields_index_value_name])
if len(one) == 3:
length = one[self._fields_index_array_length]
if type(length) == str:
length = eval(length)
tmp = self._encode_array(ctype, value, length)
else:
# 不是基礎類型,即嵌套定義
if ctype not in BaseCode._type_map:
tmp = value.encode(nest+1)
else:
fmt = '!' + self._type_map[ctype][self._type_map_index_pack_tag]
tmp = struct.pack(fmt, value)
# debug_log(fmt, type(value), value)
debug_log("#" * nest,"encode one element:", len(tmp), tmp)
data += tmp
debug_log("&" * nest, self.__class__.__name__, "encode end: len=", len(data), data)
return data
def _encode_array(self, ctype, value, max_length):
"""
打包數組
如果是字符串類型 需要做下特殊處理
:param ctype:
:param value:
:param max_length:
:return:
"""
debug_log('ctype:', ctype, type(ctype))
if ctype == 'string':
max_length -= 1 # 字符串長度需要減一
value = bytes(value, encoding='utf8')
#print(value)
if len(value) > max_length:
raise EncodeError('the length of array is too long')
# pack長度
data = struct.pack('!H', len(value))
debug_log("array count:", len(value), "value:", value, type(value))
# pack數組內容
for one in value:
#debug_log("self._type_map[ctype][1]=", self._type_map[ctype][self._type_map_index_pack_tag], one)
if ctype not in BaseCode._type_map:
data += one.encode()
else:
data += struct.pack('!' + self._type_map[ctype][self._type_map_index_pack_tag], one)
return data
數組類型在python中使用list
表示,在打包數組類型之前會添加2字節表示數組長度
字符串類型轉換為bytes
類型,然后就和普通數組一樣,一個元素一個元素處理(實際在for遍歷中,一個元素是一個int
,和C中一樣,所以用B
標志打包)
當c類型不是_type_map
中的基礎類型,那就是自定義的結構體類型,然后嵌套調用encode就可以了
目前沒有考慮union
的處理
解碼,反向處理
def decode(self, data, offset=0, nest=1):
"""
:param data:
:return:
"""
debug_log("&" * nest, self.__class__.__name__, "decode struct start :")
for one in self._fields:
debug_log("#" * nest, "decode one element:", one)
ctype = one[self._fields_index_ctype]
if len(one) == 3:
offset = self._decode_array(one, data, offset, nest)
else:
ctype_attr = self._type_map[ctype]
if ctype not in BaseCode._type_map:
value = eval(ctype + '()')
offset = value.decode(data, offset, nest)
setattr(self, one[self._fields_index_value_name], value)
else:
fmt = '!' + ctype_attr[self._type_map_index_pack_tag]
value, = struct.unpack_from(fmt, data, offset)
offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]]
debug_log(one, one[self._fields_index_value_name])
setattr(self, one[self._fields_index_value_name], value)
debug_log("#" * nest, "decode one element end:", offset, one)
return offset
def _decode_array(self, field, data, offset, nest):
ctype = field[self._fields_index_ctype]
array_num, = struct.unpack_from('!H', data, offset)
offset += 2
value = []
ctype_attr = self._type_map[ctype]
debug_log("$" * nest, "decode array count", array_num, field)
while array_num > 0:
array_num -= 1
if ctype not in BaseCode._type_map:
one = eval(ctype + '()')
offset = one.decode(data, offset, nest)
value.append(one)
else:
one, = struct.unpack_from('!' + ctype_attr[self._type_map_index_pack_tag], data, offset)
value.append(one)
offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]]
if ctype == 'string':
# 這里是因為字符串是按照單個字符解包,會解成python的int,通過chr()轉化為字符型
# value = [97,98]
# list(map(chr,value)) 后等於 ['a','b']
# ''.join() 就轉成'ab'
value = ''.join(list(map(chr, value)))
value = bytes(value, encoding='latin1').decode('utf8')
setattr(self, field[self._fields_index_value_name], value)
debug_log("$" * nest, "decode array ok", array_num, field)
return offset
最后
完整代碼:https://gitee.com/iclodq/codes/e81qfpxw3dnkju594yi6b90
包含簡單測試和轉成字典結構
在python3.5下運行成功
希望幫到各位!!