使用 huffman 編碼壓縮與解壓縮(python)


一、huffman 編碼

1.1 huffman 編碼介紹

哈夫曼編碼(Huffman Coding),又稱霍夫曼編碼,是一種編碼方式,哈夫曼編碼是可變字長編碼(VLC)的一種。Huffman於1952年提出一種編碼方法,該方法完全依據字符出現概率來構造異字頭的平均長度最短的碼字,有時稱之為最佳編碼,一般就叫做Huffman編碼(有時也稱為霍夫曼編碼)

huffman 編碼是最優碼,也是即時碼(證明較為復雜,在此不給出證明)

1.2 huffman 編碼

此處介紹二元 huffman 編碼

給定一個字符集合 \(S=\{s_0,s_1,\cdots,s_q\}\),每個字符的概率為 \(P=\{p_0,p_1,\cdots,p_q\}\)

  1. 將字符集合按概率由大到小重新排列,即使得 \(p_i ≥ p_{i+1}\)

  2. 將最末尾的兩個字符 \(s_{q-1}\)\(s_q\)合並,記為 \(s'\)\(s'\) 的概率為 \(p'=p_{q-1}+p_q\)

  3. 如果剩余的字符數不為 1,則回到第 1 步

huffman 編碼的過程大致就上述三步

當編碼完畢后,會構建出一棵碼樹,每個碼字的編碼可以從碼樹中獲得

舉個簡單的例子,對於字符集合 \(S=\{A,B,C\}\),概率為 \(P={0.5,0.3,0.2}\)

  • \(B,C\) 合並,記為 \(BC\)\(p(BC)=p(B)+p(C)=0.3+0.2=0.5\)

  • 然后將 \(A,BC\) 合並,記為 \(ABC\)\(p(ABC)=p(A)+p(BC)=0.5+0.5=1\)

記根節點 ABC 碼字為空,從根節點向下遍歷,往左走碼字末尾添0,往右走添1,那么 A, B, C 的碼字分別為 0, 10, 11,編碼表就是 \(\{A:0,B:10,C:11\}\)

1.3 huffman 編程實現

見后文的代碼實現

1.4 測試

首先在代碼目錄下新建一個newfile.txt,里邊寫入ABCDEFG
下面的代碼實現了讀取newfile.txt並對其壓縮,將結果輸出至output.enc
然后對output.enc進行解壓縮,將解壓縮結果輸出至output.dec

# 讀入文件
with open('newfile.txt', 'rb') as fp_in:
    str_bytes = fp_in.read()

# 構建huffman編碼
fre_dic = bytes_fre(str_bytes)
huffman_dic = build(fre_dic)

# 對文本進行編碼
str_enc, padding = encode(str_bytes, huffman_dic, False)

# 輸出至文件 output.enc
with open('output.enc', 'wb') as fp_out:
    fp_out.write(str_enc)

# 對編碼后的文本進行解碼
str_dec = decode(str_enc, huffman_dic, padding, False)

# 輸出至文件 output.dec
with open('output.dec', 'wb') as fp_out:
    fp_out.write(str_dec)

# 打印huffman字典和填充位
print('huffman_dic:', huffman_dic)
print('padding:', padding)

觀察壓縮前和壓縮后的文件,發現經過我們的程序壓縮之后,文件小了很多

使用 winhex 打開 newfile.txt, output.enc, output.dec 查看對應的十六進制數值

觀察 newfile.txtoutput.dec 的十六進制數值,發現一模一樣,說明我們的壓縮和解壓並沒有問題,程序正確

接下來來分析 output.enc 文件的內容

output.enc 中的數值為 C1 4E 50,轉化成二進制為 11000001 01001110 01010000

我們將中間構造出的編碼表打印出來,可以得到編碼表

字符 A B C D E F G
編碼 11 000 001 010 011 100 101

以及填充位 padding 的值為 4

我們對 newfile.txt 中的字符 ABCDEFG 進行編碼,結果為 11 000 001 010 011 100 101。按 8 位二進制數一組划分后的結果為 11000001 01001110 0101,最后一組不滿 8 位,需要補充 4 個 0padding 值為 4,最后結果為 11000001 01001110 01010000,即 C1 4E 50

我們通過手算得出的編碼輸出和程序的編碼輸出一致!
說明程序正確!

二、利用 huffman 編碼進行數據壓縮

上面我們已經介紹了數據壓縮的原理與最優碼 huffman 編碼,我們可以使用 huffman 編碼對原先用 ASCII 碼編碼的數據重新編碼,以達到數據壓縮的目的

2.1 壓縮

上面已經給出了 huffman 編碼壓縮的算法,而且也有很不錯的壓縮效率。但需要注意的是,無論是壓縮的時候,還是解壓的時候,我們都用到了從原文件中生成的 huffman 編碼表和填充位數

思考一下我們平時的壓縮軟件,給它一個壓縮包,它就能自動解壓出原先的文件。但我們剛剛編寫的程序似乎不行,它解碼的時候需要用到一個編碼表,而這個編碼表只能從未編碼的文件中獲得,我們沒辦法只靠一個壓縮后的文件就能恢復出源文件

那么,我們需要將編碼表一並存入壓縮文件中,在解壓的時候讀取編碼表進行解壓

2.2 碼表存儲

怎么存碼表?這是一個非常復雜的問題。一個簡單的方法是直接用明文存儲,就像下圖那樣

事實上,根本不會有任何壓縮軟件采用這種方式存儲碼表,因為這占用非常大的存儲空間。就拿上面的例子來說,一個 ASCII 字符占用 \(8\) 個二進制位,一共有 \(13\) 個字符,那么共占用了 \(8 \times 13 = 104\) 個二進制位!

看上去似乎並不大,但如果待編碼字符數量增加到 256 個,最好的情況是每個碼字都占用 \(8\) 個二進制位,那么存儲碼表需要 \(256 \times (8+8+8 \times 8) = 20480\) 個二進制位!也就是 \(\frac{20480}{8 \times 1024} = 2.5\) kb !

好吧,似乎看起來也不算太大,但它其實可以更小!

2.2.1 范式 huffman 編碼

范式huffman編碼對原先 huffman 編碼的碼樹進行重構,使之變成具有特定規律的碼樹:位於碼樹同一層的碼字,字典序小的靠左,字典序大的靠右,有后代的結點位於沒有后代的結點的右側

范式huffman 碼字具有以下特點

  1. 將碼樹的碼字由上至下,由左至右依次排列成 \(w_0,w_1,\cdots,w_n\)

  2. 對於同一層的碼字,最左側的碼字總是形如 \(x\)0,之后的碼字 \(w_i\) 都是其左側碼字的值加 1,即 \(w_i = w_{i-1} + 1\)

  3. 對於不在同一層的碼字,設間隔層數為 \(d\),則 \(w_i = (w_{i-1}+1)\)0\(\cdots\)0 (\(d\)\(0\))

以右圖為例,\(A,B,C,D\) 四個字符對應的碼字剛好符合上述特點

字符 A B C D
碼字 0 10 110 111

還原的時候,只需要知道字符序列 \(c_i\) 和對應的碼字長度 \(l_i\),即可還原出最初的碼字序列 \(w_i\)。根據碼字長度可以確定出碼字所在碼樹的層數,再結合 \(w_i\)\(w_{i-1}\) 的遞推關系,便可唯一確定碼字序列

在構建 范式huffman 碼表的時候,並不需要手動調節先前通過 huffman 編碼所得的碼樹,只需要利用 huffman 編碼所得到的碼長序列,並按規則從新排列字符序列,傳入 范式huffman 編碼的構造函數之中,便可得到對應的碼表

2.2.2 范式 huffman 的碼表存儲

|占用|1字節|1字節|m字節|n字節|x字節|
|-|-|-|-|-|-|-|
|含義|填充位數|最大碼字長度m|\([1,m]\)長度的碼字數量|字符序列|數據部分|
|記號|padding|m|length|char_lst|data|

通過讀取 m 確定最大碼字長度,后續的 m 個字節為 \([1,m]\) 長度的碼字數目,進行處理可以獲得碼字長度序列 length_lst。對 length 求和可以獲得字符個數 n,后續的 n 個字節為對應的字符序列 char_lst。通過 char_lstlength_lst 可以還原出碼表

由於所有的字符數量為 256 個,最深碼樹深度為 255,故 m 並不會溢出(一個字節所能表示的數據范圍為\([0,255]\))

但需要注意的是,若所有 256 個字符全部位於碼樹同一層,將無法用一個字節表示該層的碼字數量。對於這種情況,將所有的碼字數量全部置 0,即 length 全0;最大碼字長度 m 不變。當檢測到 length 全 0 但 m 不為 0 時,判斷為 256 個字符全位於碼樹同一層的情況,將 length 最后一個數據修正為 256,繼續后續操作

三、代碼實現

3.1 一些函數

3.1.1 二進制與整數的轉化

def int_to_bytes(n: int) -> bytes:
    """返回整數對應的二進制比特串 例如 50 -> b'\x50'"""
    return bytes([n])


def bytes_to_int(b: bytes) -> int:
    """返回比特串對應的整數 例如 b'\x50' -> 50"""
    return b[0]

3.1.2 壓縮過程可視化

為了讓我們能夠看到壓縮的進度,使用了 tqdm
tqdm 庫可以打印出進度條,實時顯示壓縮與解壓縮過程

需要注意,tqdm 庫並不是 python 的自帶庫,需要通過 pip 或者 conda 安裝

3.1.3 命令行參數

為了使我們的程序更方便他人調用,可以讓其接收命令行的參數,從命令行中傳入待壓縮文件和輸出路徑,而不是通過修改代碼中函數傳入的文件路徑

為此,python 的 argparse 庫可以實現這個功能

需要注意,argparse 庫並不是 python 的自帶庫,需要通過 pip 或者 conda 安裝

3.2 基礎 huffman 編碼

為了使壓縮過程可視化,需要導入 tqdm
為了讓代碼接口更為美觀,需要導入 typing

from tqdm import tqdm
from typing import Dict, List, Tuple

3.2.1 頻率統計

雖然在上面 huffman 編碼的構造中使用的是字符的頻率,但是由於計算機的精度問題,很可能會發生誤差。所以此處采用的是頻數來代替頻率

    def bytes_fre(bytes_str: bytes):
        """統計目標文本的字符頻數, 返回頻數字典
        例如b'\x4F\x56\x4F' -> {b'\x4F':2, b'\x56':1}
        """
        fre_dic = [0 for _ in range(256)]
        for item in bytes_str:
            fre_dic[item] += 1
        return {int_to_bytes(x): fre_dic[x] for x in range(256) if fre_dic[x] != 0}

3.2.2 碼表構建

首先先構造一個類,用於作為二叉樹的結點

成員 value weight lchild rchild
說明 具體的字符 字符的權重(頻數) 左孩子 右孩子
class Node:
    """Node結點,用於構建二叉數"""
    def __init__(self, value, weight, lchild, rchild):
        self.value = value
        self.weight = weight
        self.lchild = lchild
        self.rchild = rchild

然后利用 Node 類構建 huffman 樹,並生成 huffman 編碼表
對於詞頻為空或者只有一個字符的情況,需要特殊考慮

    def build(fre_dic: Dict[bytes, int]) -> Dict[bytes, str]:
        """通過字典構建Huffman編碼,返回對應的編碼字典
        例如 {b'\x4F':1, b'\x56':1} -> {b'\x4F':'0', b'\x56':'1'}
        """

        def dlr(current: Node, huffman_code: str, _huffman_dic: Dict[bytes, str]):
            """遞歸遍歷二叉樹求對應的Huffman編碼"""
            if current is None:
                return
            else:
                if current.lchild is None and current.rchild is None:
                    _huffman_dic[current.value] = huffman_code
                else:
                    dlr(current.lchild, huffman_code + '0', _huffman_dic)
                    dlr(current.rchild, huffman_code + '1', _huffman_dic)

        if not fre_dic:
            return {}
        elif len(fre_dic) == 1:
            return {value: '0' for value in fre_dic.keys()}
        # 初始化森林, 權重weight小的在后
        node_lst = [Node(value, weight, None, None) for value, weight in fre_dic.items()]
        node_lst.sort(key=lambda item: item.weight, reverse=True)
        # 構建Huffman樹
        while len(node_lst) > 1:
            # 合並最后兩棵樹
            node_2 = node_lst.pop()
            node_1 = node_lst.pop()
            node_add = Node(None, node_1.weight + node_2.weight, node_1, node_2)
            node_lst.append(node_add)
            # 調整森林
            index = len(node_lst) - 1
            while index and node_lst[index - 1].weight <= node_add.weight:
                node_lst[index] = node_lst[index - 1]
                index = index - 1
            node_lst[index] = node_add
        # 獲取Huffman編碼
        huffman_dic = {key: '' for key in fre_dic.keys()}
        dlr(node_lst[0], '', huffman_dic)
        return huffman_dic

3.2.3 編碼

    def encode(str_bytes: bytes, huffman_dic: Dict[bytes, str], visualize: bool = False) -> Tuple[bytes, int]:
        """Huffman編碼
        輸入待編碼文本, Huffman字典huffman_dic
        返回末端填充位數padding和編碼后的文本
        """
        bin_buffer = ''
        padding = 0
        # 生成整數->bytes的字典
        dic = [int_to_bytes(item) for item in range(256)]
        # 將bytes字符串轉化成bytes列表
        read_buffer = [dic[item] for item in str_bytes]
        write_buffer = bytearray([])
        # 循環讀入數據,同時編碼輸出
        for item in tqdm(read_buffer, unit='byte', disable=not visualize):
            bin_buffer = bin_buffer + huffman_dic[item]
            while len(bin_buffer) >= 8:
                write_buffer.append(int(bin_buffer[:8:], 2))
                bin_buffer = bin_buffer[8::]

        # 將緩沖區內的數據填充后輸出
        if bin_buffer:
            padding = 8 - len(bin_buffer)
            bin_buffer = bin_buffer.ljust(8, '0')
            write_buffer.append(int(bin_buffer, 2))

        return bytes(write_buffer), padding

3.2.4 解碼

    def decode(str_bytes: bytes, huffman_dic: Dict[bytes, str], padding: int, visualize: bool = False):
        """Huffman解碼
        輸入待編碼文本, Huffman字典huffman_dic, 末端填充位padding
        返回編碼后的文本
        """
        if not huffman_dic:  # 空字典,直接返回
            return b''
        elif len(huffman_dic) == 1:  # 字典長度為1,添加冗余結點,使之后續能夠正常構建碼樹
            huffman_dic[b'OVO'] = 'OVO'
        # 初始化森林, 短碼在前,長碼在后, 長度相等的碼字典序小的在前
        node_lst = [Node(value, weight, None, None) for value, weight in huffman_dic.items()]
        node_lst.sort(key=lambda _item: (len(_item.weight), _item.weight), reverse=False)
        # 構建Huffman樹
        while len(node_lst) > 1:
            # 合並最后兩棵樹
            node_2 = node_lst.pop()
            node_1 = node_lst.pop()
            node_add = Node(None, node_1.weight[:-1:], node_1, node_2)
            node_lst.append(node_add)
            # 調整森林
            node_lst.sort(key=lambda _item: (len(_item.weight), _item.weight), reverse=False)
        # 解密文本
        read_buffer, buffer_size = [], 0
        # 生成字符->二進制列表的映射
        dic = [list(map(int, bin(item)[2::].rjust(8, '0'))) for item in range(256)]
        # 將str_bytes轉化為二進制列表
        for item in str_bytes:
            read_buffer.extend(dic[item])
            buffer_size = buffer_size + 8
        read_buffer = read_buffer[0: buffer_size - padding:]
        buffer_size = buffer_size - padding
        write_buffer = bytearray([])

        current = node_lst[0]

        for pos in tqdm(range(0, buffer_size, 8), unit='byte', disable=not visualize):
            for item in read_buffer[pos:pos + 8]:
                # 根據二進制數移動current
                if item:
                    current = current.rchild
                else:
                    current = current.lchild
                # 到達葉結點,打印字符並重置current
                if current.lchild is None and current.rchild is None:
                    write_buffer.extend(current.value)
                    current = node_lst[0]

        return bytes(write_buffer)

3.3 范式 huffman

3.3.1 碼表的構造

    def rebuild(char_lst: List[bytes], length_lst: List[int]) -> Dict[bytes, str]:
        """以范氏Huffman的形式恢復字典"""
        huffman_dic = {value: '' for value in char_lst}
        current_code = 0
        for i in range(len(char_lst)):
            if i == 0:
                current_code = 0
            else:
                current_code = (current_code + 1) << (length_lst[i] - length_lst[i - 1])
            huffman_dic[char_lst[i]] = bin(current_code)[2::].rjust(length_lst[i], '0')
        return huffman_dic

3.3.2 普通huffman轉范式huffman

    def to_canonical(huffman_dic: Dict[bytes, str]) -> Dict[bytes, str]:
        """將Huffman編碼轉換成范氏Huffman編碼"""
        code_lst = [(value, len(code)) for value, code in huffman_dic.items()]
        code_lst.sort(key=lambda item: (item[1], item[0]), reverse=False)
        value_lst, length_lst = [], []
        for value, length in code_lst:
            value_lst.append(value)
            length_lst.append(length)
        return rebuild(value_lst, length_lst)

3.3.3 封裝

將實現 范式huffman 功能的代碼封裝成 Huffman

函數名稱 函數功能 返回值
bytes_fre 統計給定文本的詞頻 詞頻字典
build 給定詞頻字典構建 huffman 編碼 huffman 碼表
to_canonical 將給定的 huffman 編碼轉化成 范式huffman 編碼 范式huffman 編碼的碼表
rebuild 通過碼表信息重構 范式huffman 編碼 范式huffman 編碼的碼表
encode 使用給定碼表對給定文本編碼 編碼后的文本
decode 使用給定碼表對給定文本解碼 解碼后的文本
class Huffman:
    """Huffman編碼"""

    @staticmethod
    def bytes_fre(bytes_str: bytes):
        """統計目標文本的字符頻數, 返回頻數字典
        例如b'\x4F\x56\x4F' -> {b'\x4F':2, b'\x56':1}
        """
        pass

    @staticmethod
    def build(fre_dic: Dict[bytes, int]) -> Dict[bytes, str]:
        """通過字典構建Huffman編碼,返回對應的編碼字典
        例如 {b'\x4F':1, b'\x56':1} -> {b'\x4F':'0', b'\x56':'1'}
        """
        pass

    @classmethod
    def to_canonical(cls, huffman_dic: Dict[bytes, str]) -> Dict[bytes, str]:
        """將Huffman編碼轉換成范氏Huffman編碼"""
        pass

    @staticmethod
    def rebuild(char_lst: List[bytes], length_lst: List[int]) -> Dict[bytes, str]:
        """以范氏Huffman的形式恢復字典"""
        pass

    @staticmethod
    def decode(str_bytes: bytes, huffman_dic: Dict[bytes, str], padding: int, visualize: bool = False):
        """Huffman解碼
        輸入待編碼文本, Huffman字典huffman_dic, 末端填充位padding
        返回編碼后的文本
        """
        pass

    @staticmethod
    def encode(str_bytes: bytes, huffman_dic: Dict[bytes, str], visualize: bool = False) -> Tuple[bytes, int]:
        """Huffman編碼
        輸入待編碼文本, Huffman字典huffman_dic
        返回末端填充位數padding和編碼后的文本
        """
        pass

3.4 碼表存儲與恢復

將之前的 范式huffman 進一步封裝,完成對碼表的存儲與恢復功能
然后對 Huffman 類進一步封裝,封裝成 OVO 類(名字亂起的

函數名稱 函數功能 返回值
encode 對給定文件進行 huffman 編碼 編碼后的文件名
decode 對給定文件進行 huffman 解碼 解碼后的文件名
encode_as_huffman 對給定文本進行 huffman 編碼 編碼后的文本
decode_as_huffman 對給定文本進行 huffman 解碼 解碼后的文本
class OVO:
    VERBOSE = 0b10  # -v 顯示進度

    @classmethod
    def decode_as_huffman(cls, str_bytes: bytes, mode: int):
        """以huffman編碼解碼
        輸入byte串,返回解碼后的byte串"""
        padding = str_bytes[0]
        max_length = str_bytes[1]
        length = list(str_bytes[2:2 + max_length:])
        char_num = sum(length)
        # 如果length全零,那么表示256個字符全在同一層
        if char_num == 0 and max_length != 0:
            char_num = 256
            length[max_length - 1] = 256
        # 計算出還原huffman碼表所需的信息
        char_lst, length_lst = [], []
        for pos in range(2 + max_length, 2 + max_length + char_num):
            char_lst.append(int_to_bytes(str_bytes[pos]))
        for i in range(max_length):
            length_lst.extend([i + 1] * length[i])
        # 重構碼表
        code_dic = Huffman.rebuild(char_lst, length_lst)
        # huffman解碼
        str_bytes = str_bytes[2 + max_length + char_num::]
        write_buffer = Huffman.decode(str_bytes, code_dic, padding, bool(mode & cls.VERBOSE))
        return write_buffer

    @classmethod
    def encode_as_huffman(cls, str_bytes: bytes, mode: int):
        """以huffman編碼的形式編碼文件
        輸入bytes串,返回編碼后的比特串"""
        fre_dic = Huffman.bytes_fre(str_bytes)
        code_dic = Huffman.build(fre_dic)
        code_dic = Huffman.to_canonical(code_dic)
        max_length = 0
        for code in code_dic.values():
            max_length = max(max_length, len(code))
        length_lst = [0 for _ in range(max_length + 1)]
        for code in code_dic.values():
            length_lst[len(code)] += 1
        # 要是256個字符全部位於同一層,使用全零標記
        if length_lst[max_length] == 256:
            length_lst[max_length] = 0
        length_lst.pop(0)  # 碼長為0的字符並不存在,故刪去
        # 將碼表信息轉化成bytes類型
        code_bytes = b''.join(code_dic.keys())
        length_bytes = b''.join(map(int_to_bytes, length_lst))
        # huffman編碼
        temp_buffer, padding = Huffman.encode(str_bytes, code_dic, bool(mode & cls.VERBOSE))
        # 合並結果
        code_data = int_to_bytes(max_length) + length_bytes + code_bytes
        write_buffer = int_to_bytes(padding) + code_data + temp_buffer
        return write_buffer

    @classmethod
    def decode(cls, source_path: str, target_path: str, mode: int = 0):
        with open(source_path, 'rb') as fp_in:
            with open(target_path, 'wb') as fp_out:
                write_buffer = cls.decode_as_huffman(fp_in.read(), mode)
                fp_out.write(write_buffer)
        return target_path

    @classmethod
    def encode(cls, source_path: str, target_path: str, mode: int = 0):
        with open(source_path, 'rb') as fp_in:
            with open(target_path, 'wb') as fp_out:
                write_buffer = cls.encode_as_huffman(fp_in.read(), mode)
                fp_out.write(write_buffer)
        return target_path

3.5 命令行調用

將上面的 Huffman 類和 OVO 類寫在 OVO.py 文件內

3.5.1 壓縮

在目錄下創建 enc.py 文件,寫入以下內容
命令行第一個參數是待壓縮文件路徑,第二個參數是壓縮后的文件路徑,-v參數是可選參數,添加-v后將會實時顯示壓縮進度

from OVO import OVO
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('source_path', type=str,
                    help='the file_path which you want to encode')
parser.add_argument('target_path', type=str, default=None,
                    help='the file_path which you want to output')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
                    action='store_const', const=OVO.VERBOSE, default=0)


args = parser.parse_args()
mode = args.verbose
try:
    res = OVO.encode(source_path=args.source_path, target_path=args.target_path, mode=mode)
    print('\n{} has been encoded to {}'.format(args.source_path, res))
except FileNotFoundError as e:
    print(e)
except OSError as e:
    print(e)
    print('check your path!')
except Exception as e:
    print(e)

在調用時,我們只需要在控制台進入程序目錄,輸入 python enc.py newfile.txt output.enc 就可以實現對文件 newfile.txt 的壓縮

3.5.2 解壓

與壓縮類似,在新建一個文件 dec.py,寫入以下內容

from OVO import OVO
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('source_path', type=str,
                    help='the file_path which you want to encode')
parser.add_argument('target_path', type=str, default=None,
                    help='the file_path which you want to output')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
                    action='store_const', const=OVO.VERBOSE, default=0)


args = parser.parse_args()
mode = args.verbose
try:
    res = OVO.decode(source_path=args.source_path, target_path=args.target_path, mode=mode)
    print('\n{} has been encoded to {}'.format(args.source_path, res))
except FileNotFoundError as e:
    print(e)
except OSError as e:
    print(e)
    print('check your path!')
except Exception as e:
    print(e)

在命令行輸入 python dec.py output.enc output.dec 即可實現對文件 output.enc 的解壓縮,解壓結果位於 output.dec

3.5.3 效果

隨便找一個大一點的文件(大約20M),然后使用我們的程序進行壓縮
在這里使用了一張圖片

輸入 python enc.py ./test/pic.bmp output.enc -v

因為我的圖片位於 ./test/pic.bmp,故第一個參數就是 ./test/pic.bmp;我想把壓縮結果輸出至當前目錄下的 output.enc 文件中,故第二個參數就是 output.enc
然后添加 -v 參數,實時顯示壓縮進度

然后再對 output.enc 進行解壓

輸入 python dec.py output.enc pic_dec.bmp -v

因為我的壓縮文件位於 output.enc,故第一個參數就是 output.enc;我想把解壓結果輸出至當前目錄下的 pic_dec.bmp 文件中,故第二個參數就是 pic_dec.bmp
然后添加 -v 參數,實時顯示壓縮進度

然后打開文件 pic_dec.bmp,可以正常打開!和之前的 pic.bmp 一模一樣!程序正確!

不放心的話可以使用 winhex 檢查16進制數值,或是通過命令certUtil -hashfile ./test/pic.bmp SHA1certUtil -hashfile pic_dec.bmp SHA1 檢查兩個文件的 hash 值,一致的話就說明文件相同

3.6 完整代碼

3.6.1 OVO.py

from tqdm import tqdm
from typing import Dict, List, Tuple


def int_to_bytes(n: int) -> bytes:
    """返回整數對應的二進制比特串 例如 50 -> b'\x50'"""
    return bytes([n])


class Node:
    """Node結點,用於構建二叉數"""

    def __init__(self, value, weight, lchild, rchild):
        self.value = value
        self.weight = weight
        self.lchild = lchild
        self.rchild = rchild


class Huffman:
    """Huffman編碼"""

    @staticmethod
    def bytes_fre(bytes_str: bytes):
        """統計目標文本的字符頻數, 返回頻數字典
        例如b'\x4F\x56\x4F' -> {b'\x4F':2, b'\x56':1}
        """
        fre_dic = [0 for _ in range(256)]
        for item in bytes_str:
            fre_dic[item] += 1
        return {int_to_bytes(x): fre_dic[x] for x in range(256) if fre_dic[x] != 0}

    @staticmethod
    def build(fre_dic: Dict[bytes, int]) -> Dict[bytes, str]:
        """通過字典構建Huffman編碼,返回對應的編碼字典
        例如 {b'\x4F':1, b'\x56':1} -> {b'\x4F':'0', b'\x56':'1'}
        """

        def dlr(current: Node, huffman_code: str, _huffman_dic: Dict[bytes, str]):
            """遞歸遍歷二叉樹求對應的Huffman編碼"""
            if current is None:
                return
            else:
                if current.lchild is None and current.rchild is None:
                    _huffman_dic[current.value] = huffman_code
                else:
                    dlr(current.lchild, huffman_code + '0', _huffman_dic)
                    dlr(current.rchild, huffman_code + '1', _huffman_dic)

        if not fre_dic:
            return {}
        elif len(fre_dic) == 1:
            return {value: '0' for value in fre_dic.keys()}
        # 初始化森林, 權重weight小的在后
        node_lst = [Node(value, weight, None, None) for value, weight in fre_dic.items()]
        node_lst.sort(key=lambda item: item.weight, reverse=True)
        # 構建Huffman樹
        while len(node_lst) > 1:
            # 合並最后兩棵樹
            node_2 = node_lst.pop()
            node_1 = node_lst.pop()
            node_add = Node(None, node_1.weight + node_2.weight, node_1, node_2)
            node_lst.append(node_add)
            # 調整森林
            index = len(node_lst) - 1
            while index and node_lst[index - 1].weight <= node_add.weight:
                node_lst[index] = node_lst[index - 1]
                index = index - 1
            node_lst[index] = node_add
        # 獲取Huffman編碼
        huffman_dic = {key: '' for key in fre_dic.keys()}
        dlr(node_lst[0], '', huffman_dic)
        return huffman_dic

    @classmethod
    def to_canonical(cls, huffman_dic: Dict[bytes, str]) -> Dict[bytes, str]:
        """將Huffman編碼轉換成范氏Huffman編碼"""
        code_lst = [(value, len(code)) for value, code in huffman_dic.items()]
        code_lst.sort(key=lambda item: (item[1], item[0]), reverse=False)
        value_lst, length_lst = [], []
        for value, length in code_lst:
            value_lst.append(value)
            length_lst.append(length)
        return cls.rebuild(value_lst, length_lst)

    @staticmethod
    def rebuild(char_lst: List[bytes], length_lst: List[int]) -> Dict[bytes, str]:
        """以范氏Huffman的形式恢復字典"""
        huffman_dic = {value: '' for value in char_lst}
        current_code = 0
        for i in range(len(char_lst)):
            if i == 0:
                current_code = 0
            else:
                current_code = (current_code + 1) << (length_lst[i] - length_lst[i - 1])
            huffman_dic[char_lst[i]] = bin(current_code)[2::].rjust(length_lst[i], '0')
        return huffman_dic

    @staticmethod
    def decode(str_bytes: bytes, huffman_dic: Dict[bytes, str], padding: int, visualize: bool = False):
        """Huffman解碼
        輸入待編碼文本, Huffman字典huffman_dic, 末端填充位padding
        返回編碼后的文本
        """
        if not huffman_dic:  # 空字典,直接返回
            return b''
        elif len(huffman_dic) == 1:  # 字典長度為1,添加冗余結點,使之后續能夠正常構建碼樹
            huffman_dic[b'OVO'] = 'OVO'
        # 初始化森林, 短碼在前,長碼在后, 長度相等的碼字典序小的在前
        node_lst = [Node(value, weight, None, None) for value, weight in huffman_dic.items()]
        node_lst.sort(key=lambda _item: (len(_item.weight), _item.weight), reverse=False)
        # 構建Huffman樹
        while len(node_lst) > 1:
            # 合並最后兩棵樹
            node_2 = node_lst.pop()
            node_1 = node_lst.pop()
            node_add = Node(None, node_1.weight[:-1:], node_1, node_2)
            node_lst.append(node_add)
            # 調整森林
            node_lst.sort(key=lambda _item: (len(_item.weight), _item.weight), reverse=False)
        # 解密文本
        read_buffer, buffer_size = [], 0
        # 生成字符->二進制列表的映射
        dic = [list(map(int, bin(item)[2::].rjust(8, '0'))) for item in range(256)]
        # 將str_bytes轉化為二進制列表
        for item in str_bytes:
            read_buffer.extend(dic[item])
            buffer_size = buffer_size + 8
        read_buffer = read_buffer[0: buffer_size - padding:]
        buffer_size = buffer_size - padding
        write_buffer = bytearray([])

        current = node_lst[0]

        for pos in tqdm(range(0, buffer_size, 8), unit='byte', disable=not visualize):
            for item in read_buffer[pos:pos + 8]:
                # 根據二進制數移動current
                if item:
                    current = current.rchild
                else:
                    current = current.lchild
                # 到達葉結點,打印字符並重置current
                if current.lchild is None and current.rchild is None:
                    write_buffer.extend(current.value)
                    current = node_lst[0]

        return bytes(write_buffer)

    @staticmethod
    def encode(str_bytes: bytes, huffman_dic: Dict[bytes, str], visualize: bool = False) -> Tuple[bytes, int]:
        """Huffman編碼
        輸入待編碼文本, Huffman字典huffman_dic
        返回末端填充位數padding和編碼后的文本
        """
        bin_buffer = ''
        padding = 0
        # 生成整數->bytes的字典
        dic = [int_to_bytes(item) for item in range(256)]
        # 將bytes字符串轉化成bytes列表
        read_buffer = [dic[item] for item in str_bytes]
        write_buffer = bytearray([])
        # 循環讀入數據,同時編碼輸出
        for item in tqdm(read_buffer, unit='byte', disable=not visualize):
            bin_buffer = bin_buffer + huffman_dic[item]
            while len(bin_buffer) >= 8:
                write_buffer.append(int(bin_buffer[:8:], 2))
                bin_buffer = bin_buffer[8::]

        # 將緩沖區內的數據填充后輸出
        if bin_buffer:
            padding = 8 - len(bin_buffer)
            bin_buffer = bin_buffer.ljust(8, '0')
            write_buffer.append(int(bin_buffer, 2))

        return bytes(write_buffer), padding


class OVO:
    VERBOSE = 0b10  # -v 顯示進度

    @classmethod
    def decode_as_huffman(cls, str_bytes: bytes, mode: int):
        """以huffman編碼解碼
        輸入byte串,返回解碼后的byte串"""
        padding = str_bytes[0]
        max_length = str_bytes[1]
        length = list(str_bytes[2:2 + max_length:])
        char_num = sum(length)
        # 如果length全零,那么表示256個字符全在同一層
        if char_num == 0 and max_length != 0:
            char_num = 256
            length[max_length - 1] = 256
        # 計算出還原huffman碼表所需的信息
        char_lst, length_lst = [], []
        for pos in range(2 + max_length, 2 + max_length + char_num):
            char_lst.append(int_to_bytes(str_bytes[pos]))
        for i in range(max_length):
            length_lst.extend([i + 1] * length[i])
        # 重構碼表
        code_dic = Huffman.rebuild(char_lst, length_lst)
        # huffman解碼
        str_bytes = str_bytes[2 + max_length + char_num::]
        write_buffer = Huffman.decode(str_bytes, code_dic, padding, bool(mode & cls.VERBOSE))
        return write_buffer

    @classmethod
    def encode_as_huffman(cls, str_bytes: bytes, mode: int):
        """以huffman編碼的形式編碼文件
        輸入bytes串,返回編碼后的比特串"""
        fre_dic = Huffman.bytes_fre(str_bytes)
        code_dic = Huffman.build(fre_dic)
        code_dic = Huffman.to_canonical(code_dic)
        max_length = 0
        for code in code_dic.values():
            max_length = max(max_length, len(code))
        length_lst = [0 for _ in range(max_length + 1)]
        for code in code_dic.values():
            length_lst[len(code)] += 1
        # 要是256個字符全部位於同一層,使用全零標記
        if length_lst[max_length] == 256:
            length_lst[max_length] = 0
        length_lst.pop(0)  # 碼長為0的字符並不存在,故刪去
        # 將碼表信息轉化成bytes類型
        code_bytes = b''.join(code_dic.keys())
        length_bytes = b''.join(map(int_to_bytes, length_lst))
        # huffman編碼
        temp_buffer, padding = Huffman.encode(str_bytes, code_dic, bool(mode & cls.VERBOSE))
        # 合並結果
        code_data = int_to_bytes(max_length) + length_bytes + code_bytes
        write_buffer = int_to_bytes(padding) + code_data + temp_buffer
        return write_buffer

    @classmethod
    def decode(cls, source_path: str, target_path: str, mode: int = 0):
        with open(source_path, 'rb') as fp_in:
            with open(target_path, 'wb') as fp_out:
                write_buffer = cls.decode_as_huffman(fp_in.read(), mode)
                fp_out.write(write_buffer)
        return target_path

    @classmethod
    def encode(cls, source_path: str, target_path: str, mode: int = 0):
        with open(source_path, 'rb') as fp_in:
            with open(target_path, 'wb') as fp_out:
                write_buffer = cls.encode_as_huffman(fp_in.read(), mode)
                fp_out.write(write_buffer)
        return target_path

3.6.2 enc.py

from OVO import OVO
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('source_path', type=str,
                    help='the file_path which you want to encode')
parser.add_argument('target_path', type=str, default=None,
                    help='the file_path which you want to output')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
                    action='store_const', const=OVO.VERBOSE, default=0)


args = parser.parse_args()
mode = args.verbose
try:
    res = OVO.encode(source_path=args.source_path, target_path=args.target_path, mode=mode)
    print('\n{} has been encoded to {}'.format(args.source_path, res))
except FileNotFoundError as e:
    print(e)
except OSError as e:
    print(e)
    print('check your path!')
except Exception as e:
    print(e)

3.6.3 dec.py

from OVO import OVO
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('source_path', type=str,
                    help='the file_path which you want to encode')
parser.add_argument('target_path', type=str, default=None,
                    help='the file_path which you want to output')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
                    action='store_const', const=OVO.VERBOSE, default=0)


args = parser.parse_args()
mode = args.verbose
try:
    res = OVO.decode(source_path=args.source_path, target_path=args.target_path, mode=mode)
    print('\n{} has been encoded to {}'.format(args.source_path, res))
except FileNotFoundError as e:
    print(e)
except OSError as e:
    print(e)
    print('check your path!')
except Exception as e:
    print(e)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM