codecs模塊的作用
主要用於在不同數據之間轉換文本的編碼器和解碼器。
1、編碼切片十六進制並且指定切片的隔間

import binascii def to_hex(t, nbytes): # 設置切片的間距 chars_per_item = nbytes * 2 # 獲取十六進制的數據 hex_version = binascii.hexlify(t) #指定切片的間隔,切片十六進制的數據 return b' '.join( hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item) ) if __name__ == '__main__': print(to_hex(b'abcdef', 1)) print(to_hex(b'abcdef', 2))
運行效果
b'61 62 63 64 65 66' b'6162 6364 6566'
2、編碼UTF-8和UTF-16的示例

import binascii import unicodedata def to_hex(t, nbytes): # 設置切片的間距 chars_per_item = nbytes * 2 # 獲取十六進制的數據 hex_version = binascii.hexlify(t) # 指定切片的間隔,切片十六進制的數據 return b' '.join( hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item) ) if __name__ == '__main__': text = 'français' print('原數據 :{!r}'.format(text)) for c in text: # 打印Unicode數據庫中各個字符的名 print(' {!r}: {}'.format(c, unicodedata.name(c, c))) # 使用UTF-8編碼 print('UTF-8 : {}'.format(to_hex(text.encode('utf-8'), 1))) # 使用UTF-16編碼 print('UTF-16 : {}'.format(to_hex(text.encode('utf-16'), 2)))
運行效果
原數據 :'français' 'f': LATIN SMALL LETTER F 'r': LATIN SMALL LETTER R 'a': LATIN SMALL LETTER A 'n': LATIN SMALL LETTER N 'ç': LATIN SMALL LETTER C WITH CEDILLA 'a': LATIN SMALL LETTER A 'i': LATIN SMALL LETTER I 's': LATIN SMALL LETTER S UTF-8 : b'66 72 61 6e c3 a7 61 69 73' UTF-16 : b'fffe 6600 7200 6100 6e00 e700 6100 6900 7300'
3、解碼的示例

import binascii import unicodedata def to_hex(t, nbytes): # 設置切片的間距 chars_per_item = nbytes * 2 # 獲取十六進制的數據 hex_version = binascii.hexlify(t) # 指定切片的間隔,切片十六進制的數據 return b' '.join( hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item) ) if __name__ == '__main__': text = 'français' encoded = text.encode('utf-8') decoded = encoded.decode('utf-8') print('原來的數據 :', repr(text)) print('編碼過的內容 :', to_hex(encoded, 1), type(encoded)) print('解碼的內容 :', repr(decoded), type(decoded))
運行效果
原來的數據 : 'français' 編碼過的內容 : b'66 72 61 6e c3 a7 61 69 73' <class 'bytes'> 解碼的內容 : 'français' <class 'str'>
4、codecs模塊打開文件設置編碼格式寫入內容

import binascii import codecs import sys def to_hex(t, nbytes): # 設置切片的間距 chars_per_item = nbytes * 2 # 獲取十六進制的數據 hex_version = binascii.hexlify(t) # 指定切片的間隔,切片十六進制的數據 return b' '.join( hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item) ) encoding = sys.argv[1] filename = encoding + '.txt' print('寫入的文件名', filename) # 創建設置好編碼格式的文件句柄,並且寫入內容 with codecs.open(filename, mode='w', encoding=encoding) as wf: wf.write('français') nbytes = { 'utf-8': 1, 'utf-16': 2, 'utf-32': 3, }.get(encoding, 1) print('讀取文件內容') with open(filename, mode='rb') as rf: print(to_hex(rf.read(), nbytes))
運行效果
寫入的文件名 utf-8.txt 讀取文件內容 b'66 72 61 6e c3 a7 61 69 73'
5、設置解碼格式讀取文件內容

import codecs import sys encoding = sys.argv[1] filename = encoding + '.txt' print('讀取的文件內容', filename) # 創建設置好編碼格式的文件句柄,並且寫入內容 with codecs.open(filename, mode='r', encoding=encoding) as rf: print(repr(rf.read()))
運行效果
讀取的文件內容 utf-8.txt 'français'
6、打印出字節序

import codecs BOM_TYPES = [ 'BOM', 'BOM_BE', 'BOM_LE', 'BOM_UTF8', 'BOM_UTF16', 'BOM_UTF16_BE', 'BOM_UTF16_LE', 'BOM_UTF32', 'BOM_UTF32_BE', 'BOM_UTF32_LE', ] for name in BOM_TYPES: print('{:12} : {}'.format( name, to_hex(getattr(codecs, name), 2) #通過反射獲取屬性值 ))
運行效果
BOM : b'fffe' BOM_BE : b'feff' BOM_LE : b'fffe' BOM_UTF8 : b'efbb bf' BOM_UTF16 : b'fffe' BOM_UTF16_BE : b'feff' BOM_UTF16_LE : b'fffe' BOM_UTF32 : b'fffe 0000' BOM_UTF32_BE : b'0000 feff' BOM_UTF32_LE : b'fffe 0000'
7、codecs模塊,字節排序由解碼器在編解碼器中自動檢測和處理,但是可以在編碼時指定顯式排序。

import binascii import codecs def to_hex(t, nbytes): # 設置切片的間距 chars_per_item = nbytes * 2 # 獲取十六進制的數據 hex_version = binascii.hexlify(t) # 指定切片的間隔,切片十六進制的數據 return b' '.join( hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item) ) if codecs.BOM_UTF16 == codecs.BOM_UTF16_BE: bom = codecs.BOM_UTF16_LE encoding = 'utf_16_le' else: bom = codecs.BOM_UTF16_BE encoding = 'utf_16_be' print('Native order', to_hex(codecs.BOM_UTF16, 2)) print('Selected order', to_hex(bom, 2)) encoded_text = 'français'.encode(encoding) print('{:14} : {}'.format(encoding, to_hex(encoded_text, 2))) with open('nonnative-encoded.txt', mode='wb') as wf: wf.write(bom) wf.write(encoded_text)
運行效果
Native order b'fffe' Selected order b'feff' utf_16_be : b'0066 0072 0061 006e 00e7 0061 0069 0073'
8、codecs模塊,在打開文件時沒有指定字節順序,因此解碼器使用文件前兩個字節中的BOM值來確定它

import binascii import codecs def to_hex(t, nbytes): # 設置切片的間距 chars_per_item = nbytes * 2 # 獲取十六進制的數據 hex_version = binascii.hexlify(t) # 指定切片的間隔,切片十六進制的數據 return b' '.join( hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item) ) with open('nonnative-encoded.txt', mode='rb') as rf: raw_bytes = rf.read() print('Raw :', to_hex(raw_bytes, 2)) with codecs.open('nonnative-encoded.txt', encoding='utf-16') as f: decoded_text = f.read() print('解碼的數據', repr(decoded_text))
運行效果
Raw : b'feff 0066 0072 0061 006e 00e7 0061 0069 0073'
解碼的數據 'français'
9、編碼錯誤的處理
錯誤的模式 | 描述 |
---|---|
strict |
如果數據無法轉換時,則拋出一個異常。 |
replace |
將一個無法轉換的數據,替換為一個特殊的標記字符 |
ignore |
忽略數據 |
xmlcharrefreplace |
XML編碼 (僅編碼) |
backslashreplace |
轉義序列 (僅編碼) |
10、編碼錯誤的處理

import codecs import sys error_handling = sys.argv[1] text = 'français' try: # 利用codecs,獲取文件句柄,並且寫入內容,設置錯誤的處理機制 with codecs.open('encode_error.txt', 'w', encoding='ascii', errors=error_handling) as f: f.write(text) except UnicodeEncodeError as err: print('ERROR:', err) else: # If there was no error writing to the file, # show what it contains. with open('encode_error.txt', 'rb') as f: print('File contents: {!r}'.format(f.read()))
測試效果
$ python3 codecs_encode_error.py strict ERROR: 'ascii' codec can't encode character '\xe7' in position $ python3 codecs_encode_error.py replace File contents: b'fran?ais' $ python3 codecs_encode_error.py ignore File contents: b'franais' $ python3 codecs_encode_error.py xmlcharrefreplace File contents: b'français'
11、解碼錯誤的處理

import codecs import sys import binascii def to_hex(t, nbytes): # 設置切片的間距 chars_per_item = nbytes * 2 # 獲取十六進制的數據 hex_version = binascii.hexlify(t) # 指定切片的間隔,切片十六進制的數據 return b' '.join( hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item) ) error_handling = sys.argv[1] text = 'français' print('源數據 :', repr(text)) # 指定編碼,保存文本內容 with codecs.open('decode_error.txt', 'w', encoding='utf-16') as f: f.write(text) # 讀取文本,並且轉為十六進制顯示 with open('decode_error.txt', 'rb') as f: print('File contents:', to_hex(f.read(), 1)) # 嘗試用錯誤的編碼讀取數據 with codecs.open('decode_error.txt', 'r', encoding='utf-8', errors=error_handling) as f: try: data = f.read() except UnicodeDecodeError as err: print('ERROR:', err) else: print('Read :', repr(data))
測試效果
源數據 : 'français' File contents: b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00 73 00' Read : '��f\x00r\x00a\x00n\x00�\x00a\x00i\x00s\x00'
12、文件讀取寫入和IoByte讀取寫入編碼設置的獲取示例

import codecs import io import binascii def to_hex(t, nbytes): # 設置切片的間距 chars_per_item = nbytes * 2 # 獲取十六進制的數據 hex_version = binascii.hexlify(t) # 指定切片的間隔,切片十六進制的數據 return b' '.join( hex_version[start:start + chars_per_item] for start in range(0, len(hex_version), chars_per_item) ) data = 'français' utf8 = data.encode('utf-8') print('將utf-8編碼結果轉換十六進制,並且設置1個字節用空格分割', to_hex(utf8, 1)) # file_encoding='utf-16',指的是文件打開句柄處理的編碼 output = io.BytesIO() encoded_file = codecs.EncodedFile(output, data_encoding='utf-8', file_encoding='utf-16') encoded_file.write(utf8) utf16 = output.getvalue() print('使用file_encoding編碼獲取的值', to_hex(utf16, 2)) # data_encoding='utf-8',指的是read(),write()處理的時候,所用到的編碼 buffer = io.BytesIO(utf16) encoded_file = codecs.EncodedFile(buffer, data_encoding='utf-8', file_encoding='utf-16') recoed = encoded_file.read() print('使用data_encoding編碼獲取的值', to_hex(recoed, 1))
測試效果
將utf-8編碼結果轉換十六進制,並且設置1個字節用空格分割 b'66 72 61 6e c3 a7 61 69 73' 使用file_encoding編碼獲取的值 b'fffe 6600 7200 6100 6e00 e700 6100 6900 7300' 使用data_encoding編碼獲取的值 b'66 72 61 6e c3 a7 61 69 73'
13、非unicode的編碼示例

import codecs import io buffer = io.StringIO() stream = codecs.getwriter('rot_13')(buffer) text = 'abcdefghijklmnopqrstuvwxyz' stream.write(text) stream.flush() print('源數據', text) print('ROT_13', buffer.getvalue())
測試效果
源數據 abcdefghijklmnopqrstuvwxyz
ROT_13 nopqrstuvwxyzabcdefghijklm
14、利用zlib編碼進行數據的壓縮與解壓

import codecs import io buffer = io.BytesIO() stream = codecs.getwriter('zlib')(buffer) text = b'abcdefghijklmnopqrstuvwxyz\n' * 50 stream.write(text) stream.flush() print('源數據長度', len(text)) compressed_data = buffer.getvalue() print('zlib壓縮后的數據長度', len(compressed_data)) buffer = io.BytesIO(compressed_data) stream = codecs.getreader('zlib')(buffer) first_line = stream.readline() print('讀取第一行', repr(first_line)) uncompressed_data = first_line + stream.read() print('解壓后的數據長度', len(uncompressed_data)) print('與源數據進行比較', text == uncompressed_data)
測試效果
源數據長度 1350 zlib壓縮后的數據長度 48 讀取第一行 b'abcdefghijklmnopqrstuvwxyz\n' 解壓后的數據長度 1350 與源數據進行比較 True
15、增量bz2編碼的示例

import codecs import sys text = b'abcdefghijklmnopqrstuvwxyz\n' repetitions = 50 print('文本長度 :', len(text)) print('重復次數 :', repetitions) print('乘於重復次數的長度:', len(text) * repetitions) encoder = codecs.getincrementalencoder('bz2')() encoded = [] print('編碼:') last = repetitions - 1 for i in range(repetitions): en_c = encoder.encode(text, final=(i == last)) if en_c: print('\nEncoded: {} bytes'.format(len(en_c))) encoded.append(en_c) else: sys.stdout.write('.') all_encoded = b''.join(encoded) print('總的編碼長度', len(all_encoded)) print('解碼') decoder = codecs.getincrementaldecoder('bz2')() decoded = [] for i, b in enumerate(all_encoded): final = (i + 1) == len(text) c = decoder.decode(bytes([b]), final) if c: print('\nDecoded : {}'.format(len(c))) decoded.append(c) else: sys.stdout.write('.') restored = b''.join(decoded) print('\n解壓后的總長度', len(restored))
測試效果
文本長度 : 27 重復次數 : 50 乘於重復次數的長度: 1350 編碼: ................................................. Encoded: 99 bytes 總的編碼長度 99 解碼 ........................................................................................ Decoded : 1350 .......... 解壓后的總長度 1350
16、網絡通訊交互的數據都是unicode編碼的字節流發送的示例

import socketserver import socket import threading import codecs class Echo(socketserver.BaseRequestHandler): def handle(self): data = self.request.recv(1024) self.request.send(data) class PassThrough: def __init__(self, other): self.other = other def write(self, data): print('寫入', repr(data)) return self.other.write(data) def read(self, size=-1): print('Reading :', end=' ') data = self.other.read(size) print(repr(data)) return data def flush(self): return self.other.flush() def close(self): return self.other.close() if __name__ == '__main__': address = ('localhost', 8080) server = socketserver.TCPServer(address, Echo) ip, port = server.server_address task = threading.Thread(target=server.serve_forever) task.setDaemon(True) task.start() sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.connect((ip, port)) # 包裝socket的reader和writer方法 read_file = sk.makefile('rb') incoming = codecs.getreader('utf-8')(PassThrough(read_file)) write_file = sk.makefile('wb') outgoing = codecs.getwriter('utf-8')(PassThrough(write_file)) # 發送數據 text = 'français' print('Sending :', repr(text)) outgoing.write(text) outgoing.flush() # 接收數據 response = incoming.read() print('Received:', repr(response)) # 清理socket句柄 sk.close() server.socket.close()
測試效果
Sending : 'français' 寫入 b'fran\xc3\xa7ais' Reading : b'fran\xc3\xa7ais' Reading : b'' Received: 'français'
17、大寫轉小寫,小寫轉大寫的示例

import string # string.ascii_lowercase : abcdefghijklmnopqrstuvwxyz # string.ascii_uppercase : ABCDEFGHIJKLMNOPQRSTUVWXYZ def invertcaps(text): """大寫轉小寫,小寫轉大寫的功能""" return ''.join( c.upper() if c in string.ascii_lowercase else c.lower() if c in string.ascii_uppercase else c for c in text ) if __name__ == '__main__': print(invertcaps('ABCdef')) print(invertcaps('abcDEF'))
測試效果
abcDEF
ABCdef
18、自定義映射表,大寫轉小寫,小寫轉大寫的示例

import string import codecs decoding_map = codecs.make_identity_dict(range(256)) pairs = list(zip( [ord(c) for c in string.ascii_lowercase], [ord(c) for c in string.ascii_uppercase], )) decoding_map.update({ upper: lower for (lower, upper) in pairs }) decoding_map.update({ lower: upper for (lower, upper) in pairs }) # 創建一個單獨的編碼映射 encoding_map = codecs.make_encoding_map(decoding_map) if __name__ == '__main__': print(codecs.charmap_encode('abcDEF', 'strict', encoding_map)) print(codecs.charmap_decode(b'abcDEF', 'strict', decoding_map)) print(encoding_map == decoding_map)
測試效果
(b'ABCdef', 6) ('ABCdef', 6) True
19、自定義映射表,大寫轉小寫,小寫轉大寫的錯誤的處理

import codecs import string # Map every character to itself decoding_map = codecs.make_identity_dict(range(256)) # Make a list of pairs of ordinal values for the lower # and uppercase letters pairs = list(zip( [ord(c) for c in string.ascii_lowercase], [ord(c) for c in string.ascii_uppercase], )) # Modify the mapping to convert upper to lower and # lower to upper. decoding_map.update({ upper: lower for (lower, upper) in pairs }) decoding_map.update({ lower: upper for (lower, upper) in pairs }) # Create a separate encoding map. encoding_map = codecs.make_encoding_map(decoding_map) if __name__ == '__main__': text = 'pi: \u03c0' for error in ['ignore', 'replace', 'strict']: try: encoded = codecs.charmap_encode( text, error, encoding_map) except UnicodeEncodeError as err: encoded = str(err) print('{:7}: {}'.format(error, encoded))
測試效果
ignore : (b'PI: ', 5) replace: (b'PI: ?', 5) strict : 'charmap' codec can't encode character '\u03c0' in position 4: character maps to <undefined>
20、自定義搜索函數,用於搜索模塊支持編碼的格式

import codecs import encodings def search1(encoding): print('search1: Searching for:', encoding) return None def search2(encoding): print('search2: Searching for:', encoding) return None codecs.register(search1) codecs.register(search2) utf8 = codecs.lookup('utf-8') print('UTF-8:', utf8) try: unknown = codecs.lookup('no-such-encoding') except LookupError as err: print('ERROR:', err)
測試效果
UTF-8: <codecs.CodecInfo object for encoding utf-8 at 0x2189e635fa8> search1: Searching for: no-such-encoding search2: Searching for: no-such-encoding ERROR: unknown encoding: no-such-encoding
21、利用codecs模塊擴展自定義編碼功能的示例

import codecs import string # 創建映射關系 decoding_map = codecs.make_identity_dict(range(256)) # 創建大小寫字母 ascii值對應關系 pairs = list(zip( [ord(c) for c in string.ascii_lowercase], [ord(c) for c in string.ascii_uppercase], )) # 創大小寫字母 ascii值對應關系,更新到映射表里面 decoding_map.update({ upper: lower for (lower, upper) in pairs }) decoding_map.update({ lower: upper for (lower, upper) in pairs }) # 創建一個單獨的編碼映射。 encoding_map = codecs.make_encoding_map(decoding_map) class InvertCapsCodec(codecs.Codec): "狀態編碼器/譯碼器" def encode(self, input, errors='strict'): return codecs.charmap_encode(input, errors, encoding_map) def decode(self, input, errors='strict'): return codecs.charmap_decode(input, errors, decoding_map) class InvertCapsIncrementalEncoder(codecs.IncrementalEncoder): def encode(self, input, final=False): data, nbytes = codecs.charmap_encode(input, self.errors, encoding_map) return data class InvertCapsIncrementalDecoder(codecs.IncrementalDecoder): def decode(self, input, final=False): data, nbytes = codecs.charmap_decode(input, self.errors, decoding_map) return data class InvertCapsStreamReader(InvertCapsCodec, codecs.StreamReader): pass class InvertCapsStreamWriter(InvertCapsCodec, codecs.StreamWriter): pass def find_invertcaps(encoding): """Return the codec for 'invertcaps'. """ if encoding == 'invertcaps': return codecs.CodecInfo( name='invertcaps', encode=InvertCapsCodec().encode, decode=InvertCapsCodec().decode, incrementalencoder=InvertCapsIncrementalEncoder, incrementaldecoder=InvertCapsIncrementalDecoder, streamreader=InvertCapsStreamReader, streamwriter=InvertCapsStreamWriter, ) return None # 注冊一個新的編碼解釋器 codecs.register(find_invertcaps) if __name__ == '__main__': # 獲取一個編碼解釋器 encoder = codecs.getencoder('invertcaps') text = 'abcDEF' encoded_text, consumed = encoder(text) print('Encoded "{}" to "{}", consuming {} characters'.format( text, encoded_text, consumed)) # Stream writer import io buffer = io.BytesIO() writer = codecs.getwriter('invertcaps')(buffer) print('StreamWriter for io buffer: ') print(' writing "abcDEF"') writer.write('abcDEF') print(' buffer contents: ', buffer.getvalue()) # Incremental decoder decoder_factory = codecs.getincrementaldecoder('invertcaps') decoder = decoder_factory() decoded_text_parts = [] for c in encoded_text: decoded_text_parts.append( decoder.decode(bytes([c]), final=False) ) decoded_text_parts.append(decoder.decode(b'', final=True)) decoded_text = ''.join(decoded_text_parts) print('IncrementalDecoder converted {!r} to {!r}'.format( encoded_text, decoded_text))
測試效果
Encoded "abcDEF" to "b'ABCdef'", consuming 6 characters StreamWriter for io buffer: writing "abcDEF" buffer contents: b'ABCdef' IncrementalDecoder converted b'ABCdef' to 'abcDEF'