一、huffman 編碼
1.1 huffman 編碼介紹
哈夫曼編碼(Huffman Coding),又稱霍夫曼編碼,是一種編碼方式,哈夫曼編碼是可變字長編碼(VLC)的一種。Huffman於1952年提出一種編碼方法,該方法完全依據字符出現概率來構造異字頭的平均長度最短的碼字,有時稱之為最佳編碼,一般就叫做Huffman編碼(有時也稱為霍夫曼編碼)
huffman 編碼是最優碼,也是即時碼(證明較為復雜,在此不給出證明)
1.2 huffman 編碼
此處介紹二元 huffman 編碼
給定一個字符集合 \(S=\{s_0,s_1,\cdots,s_q\}\),每個字符的概率為 \(P=\{p_0,p_1,\cdots,p_q\}\)
-
將字符集合按概率由大到小重新排列,即使得 \(p_i ≥ p_{i+1}\)
-
將最末尾的兩個字符 \(s_{q-1}\) 和 \(s_q\)合並,記為 \(s'\),\(s'\) 的概率為 \(p'=p_{q-1}+p_q\)
-
如果剩余的字符數不為 1,則回到第 1 步
huffman 編碼的過程大致就上述三步
當編碼完畢后,會構建出一棵碼樹,每個碼字的編碼可以從碼樹中獲得
舉個簡單的例子,對於字符集合 \(S=\{A,B,C\}\),概率為 \(P={0.5,0.3,0.2}\)
-
將 \(B,C\) 合並,記為 \(BC\),\(p(BC)=p(B)+p(C)=0.3+0.2=0.5\)
-
然后將 \(A,BC\) 合並,記為 \(ABC\),\(p(ABC)=p(A)+p(BC)=0.5+0.5=1\)
記根節點 ABC 碼字為空,從根節點向下遍歷,往左走碼字末尾添0,往右走添1,那么 A, B, C 的碼字分別為 0, 10, 11,編碼表就是 \(\{A:0,B:10,C:11\}\)
1.3 huffman 編程實現
見后文的代碼實現
1.4 測試
首先在代碼目錄下新建一個newfile.txt,里邊寫入ABCDEFG
下面的代碼實現了讀取newfile.txt並對其壓縮,將結果輸出至output.enc
然后對output.enc進行解壓縮,將解壓縮結果輸出至output.dec
# 讀入文件
with open('newfile.txt', 'rb') as fp_in:
str_bytes = fp_in.read()
# 構建huffman編碼
fre_dic = bytes_fre(str_bytes)
huffman_dic = build(fre_dic)
# 對文本進行編碼
str_enc, padding = encode(str_bytes, huffman_dic, False)
# 輸出至文件 output.enc
with open('output.enc', 'wb') as fp_out:
fp_out.write(str_enc)
# 對編碼后的文本進行解碼
str_dec = decode(str_enc, huffman_dic, padding, False)
# 輸出至文件 output.dec
with open('output.dec', 'wb') as fp_out:
fp_out.write(str_dec)
# 打印huffman字典和填充位
print('huffman_dic:', huffman_dic)
print('padding:', padding)
觀察壓縮前和壓縮后的文件,發現經過我們的程序壓縮之后,文件小了很多

使用 winhex 打開 newfile.txt, output.enc, output.dec 查看對應的十六進制數值

觀察 newfile.txt 和 output.dec 的十六進制數值,發現一模一樣,說明我們的壓縮和解壓並沒有問題,程序正確
接下來來分析 output.enc 文件的內容
output.enc 中的數值為 C1 4E 50,轉化成二進制為 11000001 01001110 01010000
我們將中間構造出的編碼表打印出來,可以得到編碼表
| 字符 | A | B | C | D | E | F | G |
|---|---|---|---|---|---|---|---|
| 編碼 | 11 | 000 | 001 | 010 | 011 | 100 | 101 |
以及填充位 padding 的值為 4
我們對 newfile.txt 中的字符 ABCDEFG 進行編碼,結果為 11 000 001 010 011 100 101。按 8 位二進制數一組划分后的結果為 11000001 01001110 0101,最后一組不滿 8 位,需要補充 4 個 0,padding 值為 4,最后結果為 11000001 01001110 01010000,即 C1 4E 50
我們通過手算得出的編碼輸出和程序的編碼輸出一致!
說明程序正確!
二、利用 huffman 編碼進行數據壓縮
上面我們已經介紹了數據壓縮的原理與最優碼 huffman 編碼,我們可以使用 huffman 編碼對原先用 ASCII 碼編碼的數據重新編碼,以達到數據壓縮的目的
2.1 壓縮
上面已經給出了 huffman 編碼壓縮的算法,而且也有很不錯的壓縮效率。但需要注意的是,無論是壓縮的時候,還是解壓的時候,我們都用到了從原文件中生成的 huffman 編碼表和填充位數
思考一下我們平時的壓縮軟件,給它一個壓縮包,它就能自動解壓出原先的文件。但我們剛剛編寫的程序似乎不行,它解碼的時候需要用到一個編碼表,而這個編碼表只能從未編碼的文件中獲得,我們沒辦法只靠一個壓縮后的文件就能恢復出源文件
那么,我們需要將編碼表一並存入壓縮文件中,在解壓的時候讀取編碼表進行解壓
2.2 碼表存儲
怎么存碼表?這是一個非常復雜的問題。一個簡單的方法是直接用明文存儲,就像下圖那樣

事實上,根本不會有任何壓縮軟件采用這種方式存儲碼表,因為這占用非常大的存儲空間。就拿上面的例子來說,一個 ASCII 字符占用 \(8\) 個二進制位,一共有 \(13\) 個字符,那么共占用了 \(8 \times 13 = 104\) 個二進制位!
看上去似乎並不大,但如果待編碼字符數量增加到 256 個,最好的情況是每個碼字都占用 \(8\) 個二進制位,那么存儲碼表需要 \(256 \times (8+8+8 \times 8) = 20480\) 個二進制位!也就是 \(\frac{20480}{8 \times 1024} = 2.5\) kb !
好吧,似乎看起來也不算太大,但它其實可以更小!
2.2.1 范式 huffman 編碼
范式huffman編碼對原先 huffman 編碼的碼樹進行重構,使之變成具有特定規律的碼樹:位於碼樹同一層的碼字,字典序小的靠左,字典序大的靠右,有后代的結點位於沒有后代的結點的右側

范式huffman 碼字具有以下特點
-
將碼樹的碼字由上至下,由左至右依次排列成 \(w_0,w_1,\cdots,w_n\)
-
對於同一層的碼字,最左側的碼字總是形如 \(x\)0,之后的碼字 \(w_i\) 都是其左側碼字的值加 1,即 \(w_i = w_{i-1} + 1\)
-
對於不在同一層的碼字,設間隔層數為 \(d\),則 \(w_i = (w_{i-1}+1)\)0\(\cdots\)0 (\(d\)個\(0\))
以右圖為例,\(A,B,C,D\) 四個字符對應的碼字剛好符合上述特點
| 字符 | A | B | C | D |
|---|---|---|---|---|
| 碼字 | 0 | 10 | 110 | 111 |
還原的時候,只需要知道字符序列 \(c_i\) 和對應的碼字長度 \(l_i\),即可還原出最初的碼字序列 \(w_i\)。根據碼字長度可以確定出碼字所在碼樹的層數,再結合 \(w_i\) 和 \(w_{i-1}\) 的遞推關系,便可唯一確定碼字序列
在構建 范式huffman 碼表的時候,並不需要手動調節先前通過 huffman 編碼所得的碼樹,只需要利用 huffman 編碼所得到的碼長序列,並按規則從新排列字符序列,傳入 范式huffman 編碼的構造函數之中,便可得到對應的碼表
2.2.2 范式 huffman 的碼表存儲
|占用|1字節|1字節|m字節|n字節|x字節|
|-|-|-|-|-|-|-|
|含義|填充位數|最大碼字長度m|\([1,m]\)長度的碼字數量|字符序列|數據部分|
|記號|padding|m|length|char_lst|data|
通過讀取 m 確定最大碼字長度,后續的 m 個字節為 \([1,m]\) 長度的碼字數目,進行處理可以獲得碼字長度序列 length_lst。對 length 求和可以獲得字符個數 n,后續的 n 個字節為對應的字符序列 char_lst。通過 char_lst 和 length_lst 可以還原出碼表
由於所有的字符數量為 256 個,最深碼樹深度為 255,故 m 並不會溢出(一個字節所能表示的數據范圍為\([0,255]\))
但需要注意的是,若所有 256 個字符全部位於碼樹同一層,將無法用一個字節表示該層的碼字數量。對於這種情況,將所有的碼字數量全部置 0,即 length 全0;最大碼字長度 m 不變。當檢測到 length 全 0 但 m 不為 0 時,判斷為 256 個字符全位於碼樹同一層的情況,將 length 最后一個數據修正為 256,繼續后續操作
三、代碼實現
3.1 一些函數
3.1.1 二進制與整數的轉化
def int_to_bytes(n: int) -> bytes:
"""返回整數對應的二進制比特串 例如 50 -> b'\x50'"""
return bytes([n])
def bytes_to_int(b: bytes) -> int:
"""返回比特串對應的整數 例如 b'\x50' -> 50"""
return b[0]
3.1.2 壓縮過程可視化
為了讓我們能夠看到壓縮的進度,使用了 tqdm 庫
tqdm 庫可以打印出進度條,實時顯示壓縮與解壓縮過程
需要注意,tqdm 庫並不是 python 的自帶庫,需要通過 pip 或者 conda 安裝
3.1.3 命令行參數
為了使我們的程序更方便他人調用,可以讓其接收命令行的參數,從命令行中傳入待壓縮文件和輸出路徑,而不是通過修改代碼中函數傳入的文件路徑
為此,python 的 argparse 庫可以實現這個功能
需要注意,argparse 庫並不是 python 的自帶庫,需要通過 pip 或者 conda 安裝
3.2 基礎 huffman 編碼
為了使壓縮過程可視化,需要導入 tqdm 庫
為了讓代碼接口更為美觀,需要導入 typing 庫
from tqdm import tqdm
from typing import Dict, List, Tuple
3.2.1 頻率統計
雖然在上面 huffman 編碼的構造中使用的是字符的頻率,但是由於計算機的精度問題,很可能會發生誤差。所以此處采用的是頻數來代替頻率
def bytes_fre(bytes_str: bytes):
"""統計目標文本的字符頻數, 返回頻數字典
例如b'\x4F\x56\x4F' -> {b'\x4F':2, b'\x56':1}
"""
fre_dic = [0 for _ in range(256)]
for item in bytes_str:
fre_dic[item] += 1
return {int_to_bytes(x): fre_dic[x] for x in range(256) if fre_dic[x] != 0}
3.2.2 碼表構建
首先先構造一個類,用於作為二叉樹的結點
| 成員 | value | weight | lchild | rchild |
|---|---|---|---|---|
| 說明 | 具體的字符 | 字符的權重(頻數) | 左孩子 | 右孩子 |
class Node:
"""Node結點,用於構建二叉數"""
def __init__(self, value, weight, lchild, rchild):
self.value = value
self.weight = weight
self.lchild = lchild
self.rchild = rchild
然后利用 Node 類構建 huffman 樹,並生成 huffman 編碼表
對於詞頻為空或者只有一個字符的情況,需要特殊考慮
def build(fre_dic: Dict[bytes, int]) -> Dict[bytes, str]:
"""通過字典構建Huffman編碼,返回對應的編碼字典
例如 {b'\x4F':1, b'\x56':1} -> {b'\x4F':'0', b'\x56':'1'}
"""
def dlr(current: Node, huffman_code: str, _huffman_dic: Dict[bytes, str]):
"""遞歸遍歷二叉樹求對應的Huffman編碼"""
if current is None:
return
else:
if current.lchild is None and current.rchild is None:
_huffman_dic[current.value] = huffman_code
else:
dlr(current.lchild, huffman_code + '0', _huffman_dic)
dlr(current.rchild, huffman_code + '1', _huffman_dic)
if not fre_dic:
return {}
elif len(fre_dic) == 1:
return {value: '0' for value in fre_dic.keys()}
# 初始化森林, 權重weight小的在后
node_lst = [Node(value, weight, None, None) for value, weight in fre_dic.items()]
node_lst.sort(key=lambda item: item.weight, reverse=True)
# 構建Huffman樹
while len(node_lst) > 1:
# 合並最后兩棵樹
node_2 = node_lst.pop()
node_1 = node_lst.pop()
node_add = Node(None, node_1.weight + node_2.weight, node_1, node_2)
node_lst.append(node_add)
# 調整森林
index = len(node_lst) - 1
while index and node_lst[index - 1].weight <= node_add.weight:
node_lst[index] = node_lst[index - 1]
index = index - 1
node_lst[index] = node_add
# 獲取Huffman編碼
huffman_dic = {key: '' for key in fre_dic.keys()}
dlr(node_lst[0], '', huffman_dic)
return huffman_dic
3.2.3 編碼
def encode(str_bytes: bytes, huffman_dic: Dict[bytes, str], visualize: bool = False) -> Tuple[bytes, int]:
"""Huffman編碼
輸入待編碼文本, Huffman字典huffman_dic
返回末端填充位數padding和編碼后的文本
"""
bin_buffer = ''
padding = 0
# 生成整數->bytes的字典
dic = [int_to_bytes(item) for item in range(256)]
# 將bytes字符串轉化成bytes列表
read_buffer = [dic[item] for item in str_bytes]
write_buffer = bytearray([])
# 循環讀入數據,同時編碼輸出
for item in tqdm(read_buffer, unit='byte', disable=not visualize):
bin_buffer = bin_buffer + huffman_dic[item]
while len(bin_buffer) >= 8:
write_buffer.append(int(bin_buffer[:8:], 2))
bin_buffer = bin_buffer[8::]
# 將緩沖區內的數據填充后輸出
if bin_buffer:
padding = 8 - len(bin_buffer)
bin_buffer = bin_buffer.ljust(8, '0')
write_buffer.append(int(bin_buffer, 2))
return bytes(write_buffer), padding
3.2.4 解碼
def decode(str_bytes: bytes, huffman_dic: Dict[bytes, str], padding: int, visualize: bool = False):
"""Huffman解碼
輸入待編碼文本, Huffman字典huffman_dic, 末端填充位padding
返回編碼后的文本
"""
if not huffman_dic: # 空字典,直接返回
return b''
elif len(huffman_dic) == 1: # 字典長度為1,添加冗余結點,使之后續能夠正常構建碼樹
huffman_dic[b'OVO'] = 'OVO'
# 初始化森林, 短碼在前,長碼在后, 長度相等的碼字典序小的在前
node_lst = [Node(value, weight, None, None) for value, weight in huffman_dic.items()]
node_lst.sort(key=lambda _item: (len(_item.weight), _item.weight), reverse=False)
# 構建Huffman樹
while len(node_lst) > 1:
# 合並最后兩棵樹
node_2 = node_lst.pop()
node_1 = node_lst.pop()
node_add = Node(None, node_1.weight[:-1:], node_1, node_2)
node_lst.append(node_add)
# 調整森林
node_lst.sort(key=lambda _item: (len(_item.weight), _item.weight), reverse=False)
# 解密文本
read_buffer, buffer_size = [], 0
# 生成字符->二進制列表的映射
dic = [list(map(int, bin(item)[2::].rjust(8, '0'))) for item in range(256)]
# 將str_bytes轉化為二進制列表
for item in str_bytes:
read_buffer.extend(dic[item])
buffer_size = buffer_size + 8
read_buffer = read_buffer[0: buffer_size - padding:]
buffer_size = buffer_size - padding
write_buffer = bytearray([])
current = node_lst[0]
for pos in tqdm(range(0, buffer_size, 8), unit='byte', disable=not visualize):
for item in read_buffer[pos:pos + 8]:
# 根據二進制數移動current
if item:
current = current.rchild
else:
current = current.lchild
# 到達葉結點,打印字符並重置current
if current.lchild is None and current.rchild is None:
write_buffer.extend(current.value)
current = node_lst[0]
return bytes(write_buffer)
3.3 范式 huffman
3.3.1 碼表的構造
def rebuild(char_lst: List[bytes], length_lst: List[int]) -> Dict[bytes, str]:
"""以范氏Huffman的形式恢復字典"""
huffman_dic = {value: '' for value in char_lst}
current_code = 0
for i in range(len(char_lst)):
if i == 0:
current_code = 0
else:
current_code = (current_code + 1) << (length_lst[i] - length_lst[i - 1])
huffman_dic[char_lst[i]] = bin(current_code)[2::].rjust(length_lst[i], '0')
return huffman_dic
3.3.2 普通huffman轉范式huffman
def to_canonical(huffman_dic: Dict[bytes, str]) -> Dict[bytes, str]:
"""將Huffman編碼轉換成范氏Huffman編碼"""
code_lst = [(value, len(code)) for value, code in huffman_dic.items()]
code_lst.sort(key=lambda item: (item[1], item[0]), reverse=False)
value_lst, length_lst = [], []
for value, length in code_lst:
value_lst.append(value)
length_lst.append(length)
return rebuild(value_lst, length_lst)
3.3.3 封裝
將實現 范式huffman 功能的代碼封裝成 Huffman 類
| 函數名稱 | 函數功能 | 返回值 |
|---|---|---|
| bytes_fre | 統計給定文本的詞頻 | 詞頻字典 |
| build | 給定詞頻字典構建 huffman 編碼 | huffman 碼表 |
| to_canonical | 將給定的 huffman 編碼轉化成 范式huffman 編碼 | 范式huffman 編碼的碼表 |
| rebuild | 通過碼表信息重構 范式huffman 編碼 | 范式huffman 編碼的碼表 |
| encode | 使用給定碼表對給定文本編碼 | 編碼后的文本 |
| decode | 使用給定碼表對給定文本解碼 | 解碼后的文本 |
class Huffman:
"""Huffman編碼"""
@staticmethod
def bytes_fre(bytes_str: bytes):
"""統計目標文本的字符頻數, 返回頻數字典
例如b'\x4F\x56\x4F' -> {b'\x4F':2, b'\x56':1}
"""
pass
@staticmethod
def build(fre_dic: Dict[bytes, int]) -> Dict[bytes, str]:
"""通過字典構建Huffman編碼,返回對應的編碼字典
例如 {b'\x4F':1, b'\x56':1} -> {b'\x4F':'0', b'\x56':'1'}
"""
pass
@classmethod
def to_canonical(cls, huffman_dic: Dict[bytes, str]) -> Dict[bytes, str]:
"""將Huffman編碼轉換成范氏Huffman編碼"""
pass
@staticmethod
def rebuild(char_lst: List[bytes], length_lst: List[int]) -> Dict[bytes, str]:
"""以范氏Huffman的形式恢復字典"""
pass
@staticmethod
def decode(str_bytes: bytes, huffman_dic: Dict[bytes, str], padding: int, visualize: bool = False):
"""Huffman解碼
輸入待編碼文本, Huffman字典huffman_dic, 末端填充位padding
返回編碼后的文本
"""
pass
@staticmethod
def encode(str_bytes: bytes, huffman_dic: Dict[bytes, str], visualize: bool = False) -> Tuple[bytes, int]:
"""Huffman編碼
輸入待編碼文本, Huffman字典huffman_dic
返回末端填充位數padding和編碼后的文本
"""
pass
3.4 碼表存儲與恢復
將之前的 范式huffman 進一步封裝,完成對碼表的存儲與恢復功能
然后對 Huffman 類進一步封裝,封裝成 OVO 類(名字亂起的
| 函數名稱 | 函數功能 | 返回值 |
|---|---|---|
| encode | 對給定文件進行 huffman 編碼 | 編碼后的文件名 |
| decode | 對給定文件進行 huffman 解碼 | 解碼后的文件名 |
| encode_as_huffman | 對給定文本進行 huffman 編碼 | 編碼后的文本 |
| decode_as_huffman | 對給定文本進行 huffman 解碼 | 解碼后的文本 |
class OVO:
VERBOSE = 0b10 # -v 顯示進度
@classmethod
def decode_as_huffman(cls, str_bytes: bytes, mode: int):
"""以huffman編碼解碼
輸入byte串,返回解碼后的byte串"""
padding = str_bytes[0]
max_length = str_bytes[1]
length = list(str_bytes[2:2 + max_length:])
char_num = sum(length)
# 如果length全零,那么表示256個字符全在同一層
if char_num == 0 and max_length != 0:
char_num = 256
length[max_length - 1] = 256
# 計算出還原huffman碼表所需的信息
char_lst, length_lst = [], []
for pos in range(2 + max_length, 2 + max_length + char_num):
char_lst.append(int_to_bytes(str_bytes[pos]))
for i in range(max_length):
length_lst.extend([i + 1] * length[i])
# 重構碼表
code_dic = Huffman.rebuild(char_lst, length_lst)
# huffman解碼
str_bytes = str_bytes[2 + max_length + char_num::]
write_buffer = Huffman.decode(str_bytes, code_dic, padding, bool(mode & cls.VERBOSE))
return write_buffer
@classmethod
def encode_as_huffman(cls, str_bytes: bytes, mode: int):
"""以huffman編碼的形式編碼文件
輸入bytes串,返回編碼后的比特串"""
fre_dic = Huffman.bytes_fre(str_bytes)
code_dic = Huffman.build(fre_dic)
code_dic = Huffman.to_canonical(code_dic)
max_length = 0
for code in code_dic.values():
max_length = max(max_length, len(code))
length_lst = [0 for _ in range(max_length + 1)]
for code in code_dic.values():
length_lst[len(code)] += 1
# 要是256個字符全部位於同一層,使用全零標記
if length_lst[max_length] == 256:
length_lst[max_length] = 0
length_lst.pop(0) # 碼長為0的字符並不存在,故刪去
# 將碼表信息轉化成bytes類型
code_bytes = b''.join(code_dic.keys())
length_bytes = b''.join(map(int_to_bytes, length_lst))
# huffman編碼
temp_buffer, padding = Huffman.encode(str_bytes, code_dic, bool(mode & cls.VERBOSE))
# 合並結果
code_data = int_to_bytes(max_length) + length_bytes + code_bytes
write_buffer = int_to_bytes(padding) + code_data + temp_buffer
return write_buffer
@classmethod
def decode(cls, source_path: str, target_path: str, mode: int = 0):
with open(source_path, 'rb') as fp_in:
with open(target_path, 'wb') as fp_out:
write_buffer = cls.decode_as_huffman(fp_in.read(), mode)
fp_out.write(write_buffer)
return target_path
@classmethod
def encode(cls, source_path: str, target_path: str, mode: int = 0):
with open(source_path, 'rb') as fp_in:
with open(target_path, 'wb') as fp_out:
write_buffer = cls.encode_as_huffman(fp_in.read(), mode)
fp_out.write(write_buffer)
return target_path
3.5 命令行調用
將上面的 Huffman 類和 OVO 類寫在 OVO.py 文件內
3.5.1 壓縮
在目錄下創建 enc.py 文件,寫入以下內容
命令行第一個參數是待壓縮文件路徑,第二個參數是壓縮后的文件路徑,-v參數是可選參數,添加-v后將會實時顯示壓縮進度
from OVO import OVO
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('source_path', type=str,
help='the file_path which you want to encode')
parser.add_argument('target_path', type=str, default=None,
help='the file_path which you want to output')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
action='store_const', const=OVO.VERBOSE, default=0)
args = parser.parse_args()
mode = args.verbose
try:
res = OVO.encode(source_path=args.source_path, target_path=args.target_path, mode=mode)
print('\n{} has been encoded to {}'.format(args.source_path, res))
except FileNotFoundError as e:
print(e)
except OSError as e:
print(e)
print('check your path!')
except Exception as e:
print(e)
在調用時,我們只需要在控制台進入程序目錄,輸入 python enc.py newfile.txt output.enc 就可以實現對文件 newfile.txt 的壓縮
3.5.2 解壓
與壓縮類似,在新建一個文件 dec.py,寫入以下內容
from OVO import OVO
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('source_path', type=str,
help='the file_path which you want to encode')
parser.add_argument('target_path', type=str, default=None,
help='the file_path which you want to output')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
action='store_const', const=OVO.VERBOSE, default=0)
args = parser.parse_args()
mode = args.verbose
try:
res = OVO.decode(source_path=args.source_path, target_path=args.target_path, mode=mode)
print('\n{} has been encoded to {}'.format(args.source_path, res))
except FileNotFoundError as e:
print(e)
except OSError as e:
print(e)
print('check your path!')
except Exception as e:
print(e)
在命令行輸入 python dec.py output.enc output.dec 即可實現對文件 output.enc 的解壓縮,解壓結果位於 output.dec
3.5.3 效果
隨便找一個大一點的文件(大約20M),然后使用我們的程序進行壓縮
在這里使用了一張圖片
輸入 python enc.py ./test/pic.bmp output.enc -v
因為我的圖片位於 ./test/pic.bmp,故第一個參數就是 ./test/pic.bmp;我想把壓縮結果輸出至當前目錄下的 output.enc 文件中,故第二個參數就是 output.enc
然后添加 -v 參數,實時顯示壓縮進度

然后再對 output.enc 進行解壓
輸入 python dec.py output.enc pic_dec.bmp -v
因為我的壓縮文件位於 output.enc,故第一個參數就是 output.enc;我想把解壓結果輸出至當前目錄下的 pic_dec.bmp 文件中,故第二個參數就是 pic_dec.bmp
然后添加 -v 參數,實時顯示壓縮進度

然后打開文件 pic_dec.bmp,可以正常打開!和之前的 pic.bmp 一模一樣!程序正確!
不放心的話可以使用 winhex 檢查16進制數值,或是通過命令certUtil -hashfile ./test/pic.bmp SHA1 和 certUtil -hashfile pic_dec.bmp SHA1 檢查兩個文件的 hash 值,一致的話就說明文件相同

3.6 完整代碼
3.6.1 OVO.py
from tqdm import tqdm
from typing import Dict, List, Tuple
def int_to_bytes(n: int) -> bytes:
"""返回整數對應的二進制比特串 例如 50 -> b'\x50'"""
return bytes([n])
class Node:
"""Node結點,用於構建二叉數"""
def __init__(self, value, weight, lchild, rchild):
self.value = value
self.weight = weight
self.lchild = lchild
self.rchild = rchild
class Huffman:
"""Huffman編碼"""
@staticmethod
def bytes_fre(bytes_str: bytes):
"""統計目標文本的字符頻數, 返回頻數字典
例如b'\x4F\x56\x4F' -> {b'\x4F':2, b'\x56':1}
"""
fre_dic = [0 for _ in range(256)]
for item in bytes_str:
fre_dic[item] += 1
return {int_to_bytes(x): fre_dic[x] for x in range(256) if fre_dic[x] != 0}
@staticmethod
def build(fre_dic: Dict[bytes, int]) -> Dict[bytes, str]:
"""通過字典構建Huffman編碼,返回對應的編碼字典
例如 {b'\x4F':1, b'\x56':1} -> {b'\x4F':'0', b'\x56':'1'}
"""
def dlr(current: Node, huffman_code: str, _huffman_dic: Dict[bytes, str]):
"""遞歸遍歷二叉樹求對應的Huffman編碼"""
if current is None:
return
else:
if current.lchild is None and current.rchild is None:
_huffman_dic[current.value] = huffman_code
else:
dlr(current.lchild, huffman_code + '0', _huffman_dic)
dlr(current.rchild, huffman_code + '1', _huffman_dic)
if not fre_dic:
return {}
elif len(fre_dic) == 1:
return {value: '0' for value in fre_dic.keys()}
# 初始化森林, 權重weight小的在后
node_lst = [Node(value, weight, None, None) for value, weight in fre_dic.items()]
node_lst.sort(key=lambda item: item.weight, reverse=True)
# 構建Huffman樹
while len(node_lst) > 1:
# 合並最后兩棵樹
node_2 = node_lst.pop()
node_1 = node_lst.pop()
node_add = Node(None, node_1.weight + node_2.weight, node_1, node_2)
node_lst.append(node_add)
# 調整森林
index = len(node_lst) - 1
while index and node_lst[index - 1].weight <= node_add.weight:
node_lst[index] = node_lst[index - 1]
index = index - 1
node_lst[index] = node_add
# 獲取Huffman編碼
huffman_dic = {key: '' for key in fre_dic.keys()}
dlr(node_lst[0], '', huffman_dic)
return huffman_dic
@classmethod
def to_canonical(cls, huffman_dic: Dict[bytes, str]) -> Dict[bytes, str]:
"""將Huffman編碼轉換成范氏Huffman編碼"""
code_lst = [(value, len(code)) for value, code in huffman_dic.items()]
code_lst.sort(key=lambda item: (item[1], item[0]), reverse=False)
value_lst, length_lst = [], []
for value, length in code_lst:
value_lst.append(value)
length_lst.append(length)
return cls.rebuild(value_lst, length_lst)
@staticmethod
def rebuild(char_lst: List[bytes], length_lst: List[int]) -> Dict[bytes, str]:
"""以范氏Huffman的形式恢復字典"""
huffman_dic = {value: '' for value in char_lst}
current_code = 0
for i in range(len(char_lst)):
if i == 0:
current_code = 0
else:
current_code = (current_code + 1) << (length_lst[i] - length_lst[i - 1])
huffman_dic[char_lst[i]] = bin(current_code)[2::].rjust(length_lst[i], '0')
return huffman_dic
@staticmethod
def decode(str_bytes: bytes, huffman_dic: Dict[bytes, str], padding: int, visualize: bool = False):
"""Huffman解碼
輸入待編碼文本, Huffman字典huffman_dic, 末端填充位padding
返回編碼后的文本
"""
if not huffman_dic: # 空字典,直接返回
return b''
elif len(huffman_dic) == 1: # 字典長度為1,添加冗余結點,使之后續能夠正常構建碼樹
huffman_dic[b'OVO'] = 'OVO'
# 初始化森林, 短碼在前,長碼在后, 長度相等的碼字典序小的在前
node_lst = [Node(value, weight, None, None) for value, weight in huffman_dic.items()]
node_lst.sort(key=lambda _item: (len(_item.weight), _item.weight), reverse=False)
# 構建Huffman樹
while len(node_lst) > 1:
# 合並最后兩棵樹
node_2 = node_lst.pop()
node_1 = node_lst.pop()
node_add = Node(None, node_1.weight[:-1:], node_1, node_2)
node_lst.append(node_add)
# 調整森林
node_lst.sort(key=lambda _item: (len(_item.weight), _item.weight), reverse=False)
# 解密文本
read_buffer, buffer_size = [], 0
# 生成字符->二進制列表的映射
dic = [list(map(int, bin(item)[2::].rjust(8, '0'))) for item in range(256)]
# 將str_bytes轉化為二進制列表
for item in str_bytes:
read_buffer.extend(dic[item])
buffer_size = buffer_size + 8
read_buffer = read_buffer[0: buffer_size - padding:]
buffer_size = buffer_size - padding
write_buffer = bytearray([])
current = node_lst[0]
for pos in tqdm(range(0, buffer_size, 8), unit='byte', disable=not visualize):
for item in read_buffer[pos:pos + 8]:
# 根據二進制數移動current
if item:
current = current.rchild
else:
current = current.lchild
# 到達葉結點,打印字符並重置current
if current.lchild is None and current.rchild is None:
write_buffer.extend(current.value)
current = node_lst[0]
return bytes(write_buffer)
@staticmethod
def encode(str_bytes: bytes, huffman_dic: Dict[bytes, str], visualize: bool = False) -> Tuple[bytes, int]:
"""Huffman編碼
輸入待編碼文本, Huffman字典huffman_dic
返回末端填充位數padding和編碼后的文本
"""
bin_buffer = ''
padding = 0
# 生成整數->bytes的字典
dic = [int_to_bytes(item) for item in range(256)]
# 將bytes字符串轉化成bytes列表
read_buffer = [dic[item] for item in str_bytes]
write_buffer = bytearray([])
# 循環讀入數據,同時編碼輸出
for item in tqdm(read_buffer, unit='byte', disable=not visualize):
bin_buffer = bin_buffer + huffman_dic[item]
while len(bin_buffer) >= 8:
write_buffer.append(int(bin_buffer[:8:], 2))
bin_buffer = bin_buffer[8::]
# 將緩沖區內的數據填充后輸出
if bin_buffer:
padding = 8 - len(bin_buffer)
bin_buffer = bin_buffer.ljust(8, '0')
write_buffer.append(int(bin_buffer, 2))
return bytes(write_buffer), padding
class OVO:
VERBOSE = 0b10 # -v 顯示進度
@classmethod
def decode_as_huffman(cls, str_bytes: bytes, mode: int):
"""以huffman編碼解碼
輸入byte串,返回解碼后的byte串"""
padding = str_bytes[0]
max_length = str_bytes[1]
length = list(str_bytes[2:2 + max_length:])
char_num = sum(length)
# 如果length全零,那么表示256個字符全在同一層
if char_num == 0 and max_length != 0:
char_num = 256
length[max_length - 1] = 256
# 計算出還原huffman碼表所需的信息
char_lst, length_lst = [], []
for pos in range(2 + max_length, 2 + max_length + char_num):
char_lst.append(int_to_bytes(str_bytes[pos]))
for i in range(max_length):
length_lst.extend([i + 1] * length[i])
# 重構碼表
code_dic = Huffman.rebuild(char_lst, length_lst)
# huffman解碼
str_bytes = str_bytes[2 + max_length + char_num::]
write_buffer = Huffman.decode(str_bytes, code_dic, padding, bool(mode & cls.VERBOSE))
return write_buffer
@classmethod
def encode_as_huffman(cls, str_bytes: bytes, mode: int):
"""以huffman編碼的形式編碼文件
輸入bytes串,返回編碼后的比特串"""
fre_dic = Huffman.bytes_fre(str_bytes)
code_dic = Huffman.build(fre_dic)
code_dic = Huffman.to_canonical(code_dic)
max_length = 0
for code in code_dic.values():
max_length = max(max_length, len(code))
length_lst = [0 for _ in range(max_length + 1)]
for code in code_dic.values():
length_lst[len(code)] += 1
# 要是256個字符全部位於同一層,使用全零標記
if length_lst[max_length] == 256:
length_lst[max_length] = 0
length_lst.pop(0) # 碼長為0的字符並不存在,故刪去
# 將碼表信息轉化成bytes類型
code_bytes = b''.join(code_dic.keys())
length_bytes = b''.join(map(int_to_bytes, length_lst))
# huffman編碼
temp_buffer, padding = Huffman.encode(str_bytes, code_dic, bool(mode & cls.VERBOSE))
# 合並結果
code_data = int_to_bytes(max_length) + length_bytes + code_bytes
write_buffer = int_to_bytes(padding) + code_data + temp_buffer
return write_buffer
@classmethod
def decode(cls, source_path: str, target_path: str, mode: int = 0):
with open(source_path, 'rb') as fp_in:
with open(target_path, 'wb') as fp_out:
write_buffer = cls.decode_as_huffman(fp_in.read(), mode)
fp_out.write(write_buffer)
return target_path
@classmethod
def encode(cls, source_path: str, target_path: str, mode: int = 0):
with open(source_path, 'rb') as fp_in:
with open(target_path, 'wb') as fp_out:
write_buffer = cls.encode_as_huffman(fp_in.read(), mode)
fp_out.write(write_buffer)
return target_path
3.6.2 enc.py
from OVO import OVO
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('source_path', type=str,
help='the file_path which you want to encode')
parser.add_argument('target_path', type=str, default=None,
help='the file_path which you want to output')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
action='store_const', const=OVO.VERBOSE, default=0)
args = parser.parse_args()
mode = args.verbose
try:
res = OVO.encode(source_path=args.source_path, target_path=args.target_path, mode=mode)
print('\n{} has been encoded to {}'.format(args.source_path, res))
except FileNotFoundError as e:
print(e)
except OSError as e:
print(e)
print('check your path!')
except Exception as e:
print(e)
3.6.3 dec.py
from OVO import OVO
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('source_path', type=str,
help='the file_path which you want to encode')
parser.add_argument('target_path', type=str, default=None,
help='the file_path which you want to output')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
action='store_const', const=OVO.VERBOSE, default=0)
args = parser.parse_args()
mode = args.verbose
try:
res = OVO.decode(source_path=args.source_path, target_path=args.target_path, mode=mode)
print('\n{} has been encoded to {}'.format(args.source_path, res))
except FileNotFoundError as e:
print(e)
except OSError as e:
print(e)
print('check your path!')
except Exception as e:
print(e)
