Python之codecs模塊的使用


 codecs模塊的作用

主要用於在不同數據之間轉換文本的編碼器和解碼器。 

1、編碼切片十六進制並且指定切片的隔間 

import binascii

def to_hex(t, nbytes):
    # 設置切片的間距
    chars_per_item = nbytes * 2

    # 獲取十六進制的數據
    hex_version = binascii.hexlify(t)

    #指定切片的間隔,切片十六進制的數據
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )

if __name__ == '__main__':
    print(to_hex(b'abcdef', 1))
    print(to_hex(b'abcdef', 2))
codecs_to_hex.py

運行效果

b'61 62 63 64 65 66'
b'6162 6364 6566'

2、編碼UTF-8和UTF-16的示例

import binascii
import unicodedata


def to_hex(t, nbytes):
    # 設置切片的間距
    chars_per_item = nbytes * 2

    # 獲取十六進制的數據
    hex_version = binascii.hexlify(t)

    # 指定切片的間隔,切片十六進制的數據
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )


if __name__ == '__main__':
    text = 'français'
    print('原數據    :{!r}'.format(text))
    for c in text:
        # 打印Unicode數據庫中各個字符的名
        print(' {!r}: {}'.format(c, unicodedata.name(c, c)))
    # 使用UTF-8編碼
    print('UTF-8 : {}'.format(to_hex(text.encode('utf-8'), 1)))
    # 使用UTF-16編碼
    print('UTF-16 : {}'.format(to_hex(text.encode('utf-16'), 2)))
codecs_encodings.py

運行效果

原數據    :'français'
 'f': LATIN SMALL LETTER F
 'r': LATIN SMALL LETTER R
 'a': LATIN SMALL LETTER A
 'n': LATIN SMALL LETTER N
 'ç': LATIN SMALL LETTER C WITH CEDILLA
 'a': LATIN SMALL LETTER A
 'i': LATIN SMALL LETTER I
 's': LATIN SMALL LETTER S
UTF-8 : b'66 72 61 6e c3 a7 61 69 73'
UTF-16 : b'fffe 6600 7200 6100 6e00 e700 6100 6900 7300'

3、解碼的示例

import binascii
import unicodedata


def to_hex(t, nbytes):
    # 設置切片的間距
    chars_per_item = nbytes * 2

    # 獲取十六進制的數據
    hex_version = binascii.hexlify(t)

    # 指定切片的間隔,切片十六進制的數據
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )


if __name__ == '__main__':
    text = 'français'
    encoded = text.encode('utf-8')
    decoded = encoded.decode('utf-8')
    print('原來的數據 :', repr(text))
    print('編碼過的內容 :', to_hex(encoded, 1), type(encoded))
    print('解碼的內容 :', repr(decoded), type(decoded))
codecs_decode.py

運行效果

原來的數據 : 'français'
編碼過的內容 : b'66 72 61 6e c3 a7 61 69 73' <class 'bytes'>
解碼的內容 : 'français' <class 'str'>

4、codecs模塊打開文件設置編碼格式寫入內容

import binascii
import codecs
import sys


def to_hex(t, nbytes):
    # 設置切片的間距
    chars_per_item = nbytes * 2

    # 獲取十六進制的數據
    hex_version = binascii.hexlify(t)

    # 指定切片的間隔,切片十六進制的數據
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )


encoding = sys.argv[1]
filename = encoding + '.txt'
print('寫入的文件名', filename)

# 創建設置好編碼格式的文件句柄,並且寫入內容
with codecs.open(filename, mode='w', encoding=encoding) as wf:
    wf.write('français')
nbytes = {
    'utf-8': 1,
    'utf-16': 2,
    'utf-32': 3,
}.get(encoding, 1)

print('讀取文件內容')
with open(filename, mode='rb') as rf:
    print(to_hex(rf.read(), nbytes))
codecs_open_write.py

 運行效果

寫入的文件名 utf-8.txt
讀取文件內容
b'66 72 61 6e c3 a7 61 69 73'

5、設置解碼格式讀取文件內容

import codecs
import sys

encoding = sys.argv[1]
filename = encoding + '.txt'
print('讀取的文件內容', filename)

# 創建設置好編碼格式的文件句柄,並且寫入內容
with codecs.open(filename, mode='r', encoding=encoding) as rf:
    print(repr(rf.read()))
codecs_open_read.py

 運行效果

讀取的文件內容 utf-8.txt
'français'

6、打印出字節序

import codecs

BOM_TYPES = [
    'BOM', 'BOM_BE', 'BOM_LE',
    'BOM_UTF8',
    'BOM_UTF16', 'BOM_UTF16_BE', 'BOM_UTF16_LE',
    'BOM_UTF32', 'BOM_UTF32_BE', 'BOM_UTF32_LE',
]

for name in BOM_TYPES:
    print('{:12} : {}'.format(
        name, to_hex(getattr(codecs, name), 2) #通過反射獲取屬性值
    ))
codecs_bom.py

運行效果

BOM          : b'fffe'
BOM_BE       : b'feff'
BOM_LE       : b'fffe'
BOM_UTF8     : b'efbb bf'
BOM_UTF16    : b'fffe'
BOM_UTF16_BE : b'feff'
BOM_UTF16_LE : b'fffe'
BOM_UTF32    : b'fffe 0000'
BOM_UTF32_BE : b'0000 feff'
BOM_UTF32_LE : b'fffe 0000'

7、codecs模塊,字節排序由解碼器在編解碼器中自動檢測和處理,但是可以在編碼時指定顯式排序。

import binascii
import codecs


def to_hex(t, nbytes):
    # 設置切片的間距
    chars_per_item = nbytes * 2

    # 獲取十六進制的數據
    hex_version = binascii.hexlify(t)

    # 指定切片的間隔,切片十六進制的數據
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )


if codecs.BOM_UTF16 == codecs.BOM_UTF16_BE:
    bom = codecs.BOM_UTF16_LE
    encoding = 'utf_16_le'
else:
    bom = codecs.BOM_UTF16_BE
    encoding = 'utf_16_be'

print('Native order', to_hex(codecs.BOM_UTF16, 2))
print('Selected order', to_hex(bom, 2))

encoded_text = 'français'.encode(encoding)
print('{:14} : {}'.format(encoding, to_hex(encoded_text, 2)))

with open('nonnative-encoded.txt', mode='wb') as wf:
    wf.write(bom)
    wf.write(encoded_text)
codecs_bom_create_file.py

 運行效果

Native order b'fffe'
Selected order b'feff'
utf_16_be      : b'0066 0072 0061 006e 00e7 0061 0069 0073'

8、codecs模塊,在打開文件時沒有指定字節順序,因此解碼器使用文件前兩個字節中的BOM值來確定它

import binascii
import codecs


def to_hex(t, nbytes):
    # 設置切片的間距
    chars_per_item = nbytes * 2

    # 獲取十六進制的數據
    hex_version = binascii.hexlify(t)

    # 指定切片的間隔,切片十六進制的數據
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )


with open('nonnative-encoded.txt', mode='rb') as rf:
    raw_bytes = rf.read()
print('Raw  :', to_hex(raw_bytes, 2))
with codecs.open('nonnative-encoded.txt', encoding='utf-16') as f:
    decoded_text = f.read()

print('解碼的數據', repr(decoded_text))
codecs_bom_detection.py

 運行效果

Raw : b'feff 0066 0072 0061 006e 00e7 0061 0069 0073'
解碼的數據 'français'

 9、編碼錯誤的處理

錯誤的模式 描述
strict 如果數據無法轉換時,則拋出一個異常。
replace 將一個無法轉換的數據,替換為一個特殊的標記字符
ignore 忽略數據
xmlcharrefreplace XML編碼 (僅編碼)
backslashreplace 轉義序列 (僅編碼)

10、編碼錯誤的處理

import codecs
import sys

error_handling = sys.argv[1]

text = 'français'

try:
    # 利用codecs,獲取文件句柄,並且寫入內容,設置錯誤的處理機制
    with codecs.open('encode_error.txt', 'w',
                     encoding='ascii',
                     errors=error_handling) as f:
        f.write(text)

except UnicodeEncodeError as err:
    print('ERROR:', err)

else:
    # If there was no error writing to the file,
    # show what it contains.
    with open('encode_error.txt', 'rb') as f:
        print('File contents: {!r}'.format(f.read()))
codecs_encode_error.py

測試效果

$ python3 codecs_encode_error.py strict
ERROR: 'ascii' codec can't encode character '\xe7' in position

$ python3 codecs_encode_error.py replace
File contents: b'fran?ais'

$ python3 codecs_encode_error.py ignore
File contents: b'franais'

$ python3 codecs_encode_error.py xmlcharrefreplace
File contents: b'fran&#231;ais'

11、解碼錯誤的處理

import codecs
import sys
import binascii


def to_hex(t, nbytes):
    # 設置切片的間距
    chars_per_item = nbytes * 2

    # 獲取十六進制的數據
    hex_version = binascii.hexlify(t)

    # 指定切片的間隔,切片十六進制的數據
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )


error_handling = sys.argv[1]

text = 'français'
print('源數據     :', repr(text))

# 指定編碼,保存文本內容
with codecs.open('decode_error.txt', 'w',
                 encoding='utf-16') as f:
    f.write(text)

# 讀取文本,並且轉為十六進制顯示
with open('decode_error.txt', 'rb') as f:
    print('File contents:', to_hex(f.read(), 1))

# 嘗試用錯誤的編碼讀取數據
with codecs.open('decode_error.txt', 'r',
                 encoding='utf-8',
                 errors=error_handling) as f:
    try:
        data = f.read()
    except UnicodeDecodeError as err:
        print('ERROR:', err)
    else:
        print('Read         :', repr(data))
codecs_decode_error.py

測試效果

源數據     : 'français'
File contents: b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00 73 00'
Read         : '��f\x00r\x00a\x00n\x00�\x00a\x00i\x00s\x00'

 12、文件讀取寫入和IoByte讀取寫入編碼設置的獲取示例

import codecs
import io
import binascii


def to_hex(t, nbytes):
    # 設置切片的間距
    chars_per_item = nbytes * 2

    # 獲取十六進制的數據
    hex_version = binascii.hexlify(t)

    # 指定切片的間隔,切片十六進制的數據
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )


data = 'français'

utf8 = data.encode('utf-8')
print('將utf-8編碼結果轉換十六進制,並且設置1個字節用空格分割', to_hex(utf8, 1))

# file_encoding='utf-16',指的是文件打開句柄處理的編碼
output = io.BytesIO()
encoded_file = codecs.EncodedFile(output, data_encoding='utf-8', file_encoding='utf-16')
encoded_file.write(utf8)

utf16 = output.getvalue()
print('使用file_encoding編碼獲取的值', to_hex(utf16, 2))

# data_encoding='utf-8',指的是read(),write()處理的時候,所用到的編碼
buffer = io.BytesIO(utf16)
encoded_file = codecs.EncodedFile(buffer, data_encoding='utf-8', file_encoding='utf-16')
recoed = encoded_file.read()
print('使用data_encoding編碼獲取的值', to_hex(recoed, 1))
codecs_encodedfile.py

測試效果

將utf-8編碼結果轉換十六進制,並且設置1個字節用空格分割 b'66 72 61 6e c3 a7 61 69 73'
使用file_encoding編碼獲取的值 b'fffe 6600 7200 6100 6e00 e700 6100 6900 7300'
使用data_encoding編碼獲取的值 b'66 72 61 6e c3 a7 61 69 73'

13、非unicode的編碼示例

import codecs
import io

buffer = io.StringIO()
stream = codecs.getwriter('rot_13')(buffer)
text = 'abcdefghijklmnopqrstuvwxyz'
stream.write(text)
stream.flush()

print('源數據', text)
print('ROT_13', buffer.getvalue())
codecs_rot13.py

測試效果

源數據 abcdefghijklmnopqrstuvwxyz
ROT_13 nopqrstuvwxyzabcdefghijklm

14、利用zlib編碼進行數據的壓縮與解壓

import codecs
import io

buffer = io.BytesIO()
stream = codecs.getwriter('zlib')(buffer)
text = b'abcdefghijklmnopqrstuvwxyz\n' * 50
stream.write(text)
stream.flush()

print('源數據長度', len(text))

compressed_data = buffer.getvalue()
print('zlib壓縮后的數據長度', len(compressed_data))

buffer = io.BytesIO(compressed_data)
stream = codecs.getreader('zlib')(buffer)

first_line = stream.readline()
print('讀取第一行', repr(first_line))

uncompressed_data = first_line + stream.read()
print('解壓后的數據長度', len(uncompressed_data))
print('與源數據進行比較', text == uncompressed_data)
codecs_zlib.py

 測試效果

源數據長度 1350
zlib壓縮后的數據長度 48
讀取第一行 b'abcdefghijklmnopqrstuvwxyz\n'
解壓后的數據長度 1350
與源數據進行比較 True

15、增量bz2編碼的示例

import codecs
import sys

text = b'abcdefghijklmnopqrstuvwxyz\n'
repetitions = 50

print('文本長度 :', len(text))
print('重復次數 :', repetitions)
print('乘於重復次數的長度:', len(text) * repetitions)

encoder = codecs.getincrementalencoder('bz2')()
encoded = []
print('編碼:')
last = repetitions - 1
for i in range(repetitions):
    en_c = encoder.encode(text, final=(i == last))
    if en_c:
        print('\nEncoded: {} bytes'.format(len(en_c)))
        encoded.append(en_c)
    else:
        sys.stdout.write('.')

all_encoded = b''.join(encoded)
print('總的編碼長度', len(all_encoded))

print('解碼')
decoder = codecs.getincrementaldecoder('bz2')()
decoded = []
for i, b in enumerate(all_encoded):
    final = (i + 1) == len(text)
    c = decoder.decode(bytes([b]), final)
    if c:
        print('\nDecoded : {}'.format(len(c)))
        decoded.append(c)
    else:
        sys.stdout.write('.')
restored = b''.join(decoded)
print('\n解壓后的總長度', len(restored))
codecs_incremental_bz2.py

測試效果

文本長度 : 27
重復次數 : 50
乘於重復次數的長度: 1350
編碼:
.................................................
Encoded: 99 bytes
總的編碼長度 99
解碼
........................................................................................
Decoded : 1350
..........
解壓后的總長度 1350

16、網絡通訊交互的數據都是unicode編碼的字節流發送的示例

import socketserver
import socket
import threading
import codecs


class Echo(socketserver.BaseRequestHandler):
    def handle(self):
        data = self.request.recv(1024)
        self.request.send(data)


class PassThrough:
    def __init__(self, other):
        self.other = other

    def write(self, data):
        print('寫入', repr(data))
        return self.other.write(data)

    def read(self, size=-1):
        print('Reading :', end=' ')
        data = self.other.read(size)
        print(repr(data))
        return data

    def flush(self):
        return self.other.flush()

    def close(self):
        return self.other.close()


if __name__ == '__main__':
    address = ('localhost', 8080)
    server = socketserver.TCPServer(address, Echo)
    ip, port = server.server_address

    task = threading.Thread(target=server.serve_forever)
    task.setDaemon(True)
    task.start()

    sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sk.connect((ip, port))

    # 包裝socket的reader和writer方法
    read_file = sk.makefile('rb')
    incoming = codecs.getreader('utf-8')(PassThrough(read_file))

    write_file = sk.makefile('wb')
    outgoing = codecs.getwriter('utf-8')(PassThrough(write_file))

    # 發送數據
    text = 'français'
    print('Sending :', repr(text))
    outgoing.write(text)
    outgoing.flush()

    # 接收數據
    response = incoming.read()
    print('Received:', repr(response))

    # 清理socket句柄
    sk.close()
    server.socket.close()
codecs_socket.py

測試效果

Sending : 'français'
寫入 b'fran\xc3\xa7ais'
Reading : b'fran\xc3\xa7ais'
Reading : b'' Received: 'français'

17、大寫轉小寫,小寫轉大寫的示例

import string


# string.ascii_lowercase : abcdefghijklmnopqrstuvwxyz
# string.ascii_uppercase : ABCDEFGHIJKLMNOPQRSTUVWXYZ
def invertcaps(text):
    """大寫轉小寫,小寫轉大寫的功能"""
    return ''.join(
        c.upper()
        if c in string.ascii_lowercase
        else c.lower() if c in string.ascii_uppercase else c
        for c in text
    )


if __name__ == '__main__':
    print(invertcaps('ABCdef'))
    print(invertcaps('abcDEF'))
codecs_invertcaps.py

測試效果

abcDEF
ABCdef

18、自定義映射表,大寫轉小寫,小寫轉大寫的示例

import string
import codecs

decoding_map = codecs.make_identity_dict(range(256))

pairs = list(zip(
    [ord(c) for c in string.ascii_lowercase],
    [ord(c) for c in string.ascii_uppercase],
))

decoding_map.update({
    upper: lower
    for (lower, upper) in pairs
})

decoding_map.update({
    lower: upper
    for (lower, upper) in pairs
})

# 創建一個單獨的編碼映射
encoding_map = codecs.make_encoding_map(decoding_map)

if __name__ == '__main__':
    print(codecs.charmap_encode('abcDEF', 'strict',
                                encoding_map))
    print(codecs.charmap_decode(b'abcDEF', 'strict',
                                decoding_map))
    print(encoding_map == decoding_map)
codecs_invertcaps.py

測試效果

(b'ABCdef', 6)
('ABCdef', 6)
True

19、自定義映射表,大寫轉小寫,小寫轉大寫的錯誤的處理

import codecs
import string

# Map every character to itself
decoding_map = codecs.make_identity_dict(range(256))

# Make a list of pairs of ordinal values for the lower
# and uppercase letters
pairs = list(zip(
    [ord(c) for c in string.ascii_lowercase],
    [ord(c) for c in string.ascii_uppercase],
))

# Modify the mapping to convert upper to lower and
# lower to upper.
decoding_map.update({
    upper: lower
    for (lower, upper)
    in pairs
})
decoding_map.update({
    lower: upper
    for (lower, upper)
    in pairs
})

# Create a separate encoding map.
encoding_map = codecs.make_encoding_map(decoding_map)

if __name__ == '__main__':
    text = 'pi: \u03c0'

    for error in ['ignore', 'replace', 'strict']:
        try:
            encoded = codecs.charmap_encode(
                text, error, encoding_map)
        except UnicodeEncodeError as err:
            encoded = str(err)
        print('{:7}: {}'.format(error, encoded))
codecs_invertcaps_error.py

測試效果

ignore : (b'PI: ', 5)
replace: (b'PI: ?', 5)
strict : 'charmap' codec can't encode character '\u03c0' in position 4: character maps to <undefined>

20、自定義搜索函數,用於搜索模塊支持編碼的格式

import codecs
import encodings


def search1(encoding):
    print('search1: Searching for:', encoding)
    return None


def search2(encoding):
    print('search2: Searching for:', encoding)
    return None


codecs.register(search1)
codecs.register(search2)

utf8 = codecs.lookup('utf-8')
print('UTF-8:', utf8)

try:
    unknown = codecs.lookup('no-such-encoding')
except LookupError as err:
    print('ERROR:', err)
codecs_register.py

測試效果

UTF-8: <codecs.CodecInfo object for encoding utf-8 at 0x2189e635fa8>
search1: Searching for: no-such-encoding
search2: Searching for: no-such-encoding
ERROR: unknown encoding: no-such-encoding

21、利用codecs模塊擴展自定義編碼功能的示例

import codecs
import string

# 創建映射關系
decoding_map = codecs.make_identity_dict(range(256))

# 創建大小寫字母 ascii值對應關系
pairs = list(zip(
    [ord(c) for c in string.ascii_lowercase],
    [ord(c) for c in string.ascii_uppercase],
))

# 創大小寫字母 ascii值對應關系,更新到映射表里面
decoding_map.update({
    upper: lower
    for (lower, upper)
    in pairs
})
decoding_map.update({
    lower: upper
    for (lower, upper)
    in pairs
})

# 創建一個單獨的編碼映射。
encoding_map = codecs.make_encoding_map(decoding_map)


class InvertCapsCodec(codecs.Codec):
    "狀態編碼器/譯碼器"

    def encode(self, input, errors='strict'):
        return codecs.charmap_encode(input, errors, encoding_map)

    def decode(self, input, errors='strict'):
        return codecs.charmap_decode(input, errors, decoding_map)


class InvertCapsIncrementalEncoder(codecs.IncrementalEncoder):
    def encode(self, input, final=False):
        data, nbytes = codecs.charmap_encode(input,
                                             self.errors,
                                             encoding_map)
        return data


class InvertCapsIncrementalDecoder(codecs.IncrementalDecoder):
    def decode(self, input, final=False):
        data, nbytes = codecs.charmap_decode(input,
                                             self.errors,
                                             decoding_map)
        return data


class InvertCapsStreamReader(InvertCapsCodec,
                             codecs.StreamReader):
    pass


class InvertCapsStreamWriter(InvertCapsCodec,
                             codecs.StreamWriter):
    pass


def find_invertcaps(encoding):
    """Return the codec for 'invertcaps'.
    """
    if encoding == 'invertcaps':
        return codecs.CodecInfo(
            name='invertcaps',
            encode=InvertCapsCodec().encode,
            decode=InvertCapsCodec().decode,
            incrementalencoder=InvertCapsIncrementalEncoder,
            incrementaldecoder=InvertCapsIncrementalDecoder,
            streamreader=InvertCapsStreamReader,
            streamwriter=InvertCapsStreamWriter,
        )
    return None


# 注冊一個新的編碼解釋器
codecs.register(find_invertcaps)

if __name__ == '__main__':

    # 獲取一個編碼解釋器
    encoder = codecs.getencoder('invertcaps')
    text = 'abcDEF'
    encoded_text, consumed = encoder(text)
    print('Encoded "{}" to "{}", consuming {} characters'.format(
        text, encoded_text, consumed))

    # Stream writer
    import io

    buffer = io.BytesIO()
    writer = codecs.getwriter('invertcaps')(buffer)
    print('StreamWriter for io buffer: ')
    print('  writing "abcDEF"')
    writer.write('abcDEF')
    print('  buffer contents: ', buffer.getvalue())

    # Incremental decoder
    decoder_factory = codecs.getincrementaldecoder('invertcaps')
    decoder = decoder_factory()
    decoded_text_parts = []
    for c in encoded_text:
        decoded_text_parts.append(
            decoder.decode(bytes([c]), final=False)
        )
    decoded_text_parts.append(decoder.decode(b'', final=True))
    decoded_text = ''.join(decoded_text_parts)
    print('IncrementalDecoder converted {!r} to {!r}'.format(
        encoded_text, decoded_text))
codecs_invertcaps_register.py

測試效果

Encoded "abcDEF" to "b'ABCdef'", consuming 6 characters
StreamWriter for io buffer: 
  writing "abcDEF"
  buffer contents:  b'ABCdef'
IncrementalDecoder converted b'ABCdef' to 'abcDEF'

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM