Python RSA加解密文本文件


近來在使用python寫項目,特此記錄一下項目中遇到的文件加解密問題。
關於python版本的加密算法,隨便搜一搜還是可以檢索出來很多的,不過大都是同一篇文章在不同的平台來回發布,或者就是轉載,而且例舉的都是最簡單的情況,那么,實際項目中使用的話,肯定會比這個要稍微復雜一些,比如我的需求就是要加密一個使用mysqldump出來的數據庫腳本文件,直接拿網上的例子過來調用肯定是不行的,所以不得不自己研究了一番,特此記錄。

RSA算法

什么是RSA算法?

項目選型的算法是RSA非對稱加密算法,關於這個算法不做過多的解釋,咱們划重點:

  • 公鑰用於加密
  • 私鑰用於解密
  • len_in_byte(raw_data) = len_in_bit(key)/8 -11,如 1024bit 的密鑰,一次能加密的內容長度為 1024/8 -11 = 117 byte

為何要減去11個byte?

因為我們使用的是PKCS1Padding占用了11個byte,那么它能加密的明文長度就必須減去這11個byte

可能會遇到什么問題?

基於以上三點,我們大概可以知道要完成文件加解密,我們可能會遇到什么問題?

  • 一次性加密明文的長度是和密鑰長度有關系的,那么我們要加密一個文件,不能一次性將文本內容讀取出來,然后加密
  • 如果文件很大,我們也不可能將文件內容一次性讀取到內存當中,可能會直接導致服務器無法響應其他請求,這肯定是不合理的
  • 文本被加密之后,回頭解密,如果讀取的長度有差異勢必導致解密失敗,那么這個數據庫備份文件就廢了,這個就比較危險了

Do It

  1. 安裝依賴,python版本3.7.4

pip install pycryptodomex -i https://pypi.tuna.tsinghua.edu.cn/simple/

  1. 導入模塊:
import base64
from Cryptodome import Random
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.Signature import PKCS1_v1_5 as Signature_pkcs1_v1_5
  1. 生成公鑰+私鑰,注意這里我們生成的公鑰長度是1024bit
# 偽隨機數生成器
random_generator = Random.new().read
# rsa算法生成實例
rsa = RSA.generate(1024, random_generator)
private_pem = str(rsa.exportKey(), encoding="utf-8")
with open("client-private.pem", "w") as f:
    f.write(private_pem)
    
public_pem = str(rsa.publickey().exportKey(), encoding="utf-8")
with open("client-public.pem", "w") as f:
    f.write(public_pem)'''
  1. 加密,這里對傳入的明文長度做了切分,因為我們生成的密鑰長度為1024bit,所以我們一次加密的明文長度不能超過117個byte

def rsa_encrypt(plaintext, pub_key):
    '''
    rsa 加密
    :param plaintext: 明文
    :param pub_key:公鑰
    '''
    message = plaintext.encode("utf-8")
    length = len(message)
    default_length = 117  # 1024/8 - 11 1024為密鑰長度
    rsakey = RSA.importKey(pub_key)
    cipher = Cipher_pkcs1_v1_5.new(rsakey)
    # 不需要切分
    if length <= default_length:
        return default_rsa_encrypt(cipher, message)
    # 需要切分
    offset = 0
    result = []
    while length - offset > 0:
        if length - offset > default_length:
            result.append(default_rsa_encrypt(
                cipher, message[offset:offset+default_length]))
        else:
            result.append(default_rsa_encrypt(cipher, message[offset:]))
        offset += default_length
    return "\n".join(result)
    
def default_rsa_encrypt(cipher, message):
    ciphertext = base64.b64encode(cipher.encrypt(message))
    # print(b"ciphertext:"+ciphertext)
    ciphertext_decode = ciphertext.decode("utf-8")
    # print("ciphertext_decode:"+ciphertext_decode)
    return ciphertext_decode
  1. 解密

def rsa_decrypt(ciphertext, priv_key):
    '''
    rsa 解密
    :param ciphertext:密文
    :param priv_key:私鑰
    '''
    message = base64.b64decode(ciphertext)
    length = len(message)
    default_length = 128
    rsakey = RSA.importKey(priv_key)
    cipher = Cipher_pkcs1_v1_5.new(rsakey)
    if length <= default_length:
        return default_rsa_decrypt(cipher, message)
    # 需要分段
    offset = 0
    result = []
    while length - offset > 0:
        if length - offset > default_length:
            result.append(rsa_decrypt(
                cipher, message[offset:offset+default_length]))
        else:
            result.append(rsa_decrypt(cipher, message[offset:]))
        offset += default_length
    decode_message = [x.decode("utf-8") for x in result]
    return "".join(decode_message)
    
def default_rsa_decrypt(cipher, message):
    plaintext = cipher.decrypt(message, random_generator)
    # print(b"plaintext:"+plaintext)
    plaintext_decode = plaintext.decode("utf-8")
    # print("plaintext_decode:"+plaintext_decode)
    return plaintext_decode
  1. 加解密文件,考慮開頭我們提出的問題,采用了逐行讀取,逐行加密,加密后密文也逐行寫入
def rsa_encrypt_file(file_path, save_path, pub_key):
    '''
    rsa 加密文件
    :param file_path:需要加密文件路徑
    :param save_path:加密之后存放的文件路徑
    :param pub_key:公鑰
    '''
    with open(file_path, "r", encoding="utf-8") as f:
        line = f.readline()  # 讀取一行
        while line:
            context = rsa_encrypt(line, pub_key)  # 加密切割后的字符
            with open(save_path, "a", encoding="utf-8") as w:
                w.write(context+"\n")
        line = f.readline()
def rsa_decrypt_file(file_path,save_path,priv_key):
    '''
    rsa 解密文件
    :file_path:需要解密的文件路徑
    :save_path:解密之后存放的文件路徑
    :priv_key:私鑰
    '''
    with open(file_path,"r",encoding="utf-8") as f:
        line = f.readline()
        while line:
            context = rsa_decrypt(line.strip("\n"),priv_key)
            with open(save_path,"a",encoding="utf-8") as w:
                w.write(context)
            line = f.readline()
  1. 測試,一開始我使用的是自己隨便輸入的一行很長的數字文本,親測沒有問題,但是當我直接使用我的數據庫腳本文件的時候,加密可以成功,但是會遇到解密后解碼失敗的情況,當時百思不得其解,我以為是字符集的問題,於是我將utf-8,換成了gb2312,加解密成功了,當時心花怒放,直到我重新加解密了另一個備份文件,又遇到解碼失敗,當時就睡不着覺了~

直到我看到了這句話不完整的多字節序列(incomplete multibyte sequence)我瞬間明白了,因為我的腳本文件中含有中文,utf8 編碼一個漢字是3個byte,gb2312編碼一個漢字是2個byte,只要是多字節,那么做切割的時候,就有可能一個漢字被切割成了兩部分,那么自然會導致無法解碼成正確的漢字了,問題已經明了,就看怎么解決了。

因為是腳本文件,處理不好就有可能導致腳本執行失敗,最終導致數據庫還原失敗,這就違背項目初衷了~

所以我想了一個辦法,先對每一行文本做字符編碼判斷,超過了117,最后一個字符就不累計上去,代碼如下:

def cut_string(message,length = 117):
    result = []
    temp_char = []
    for msg in message:#遍歷每一個字符
        msg_encode = msg.encode("utf-8")#對每一個字符編碼
        temp_encode = "".join(temp_char).encode("utf-8")#累計編碼之后的字節數
        if len(temp_encode) + len(msg_encode) <= length:#如果小於約定的長度,加添加入結果集
            temp_char.append(msg)
        else:#如果已經超過了約定的長度,就添加入下一個結果集
            result.append("".join(temp_char))
            temp_char.clear()
            temp_char.append(msg)
    result.append("".join(temp_char))
    return result

加密方法需要重新調整一下:


def rsa_encrypt_file(file_path,save_path,pub_key):
    '''
    rsa 加密文件
    :param file_path:需要加密文件路徑
    :param save_path:加密之后存放的文件路徑
    :param pub_key:公鑰
    '''
    with open(file_path,"r",encoding="utf-8") as f:
        line = f.readline() #讀取一行
        while line:
            cut_lines = cut_string(line) # 切割字符 保證漢字不被切割
            for cut_line in cut_lines:
                context = rsa_encrypt(cut_line,pub_key) #加密切割后的字符
                with open(save_path,"a",encoding="utf-8") as w:
                    w.write(context+"\n")
            line = f.readline()

到此問題就已經解決了,其實有了這個cut_string方法之后,之前寫的加解密方法中不需要再做切分,但是代碼保留。

2020-04-21 更新

上面的方法,加解密的效率非常的低,因為是逐行加解密,一個300M的腳本文件,加密完成耗時40分鍾,這個實在是太難受了,所以調整了策略,先壓縮再加密,所以就涉及到二進制文件的讀取與寫入,最后的實現代碼如下:

def rsa_encrypt_binfile(file_path,save_path,pub_key):
    '''
    rsa 加密二進制文件
    :param file_path:需要加密文件路徑
    :param save_path:加密之后存放的文件路徑
    :param pub_key:公鑰
    '''
    with open(file_path, 'rb') as f:
        message = f.read()
    length = len(message)
    default_length = 117  # 1024/8 - 11 1024為密鑰長度
    rsakey = RSA.importKey(pub_key)
    cipher = Cipher_pkcs1_v1_5.new(rsakey)
    # 不需要切分
    result = []
    if length <= default_length:
        result.append(base64.b64encode(cipher.encrypt(message)))

    # 需要切分
    offset = 0
    while length - offset > 0:
        if length - offset > default_length:
            result.append(base64.b64encode(cipher.encrypt(message[offset:offset+default_length])))
        else:
            result.append(base64.b64encode(cipher.encrypt(message[offset:])))
        offset += default_length
    
    with open(save_path,"ab+") as w:
        for ciphertext in result:
            ciphertext += b"\n"
            w.write(ciphertext)

def rsa_decrypt_binfile(file_path,save_path,priv_key):
    '''
    rsa 解密二進制文件
    :file_path:需要解密的文件路徑
    :save_path:解密之后存放的文件路徑
    :priv_key:私鑰
    '''
    with open(file_path,"rb") as f:
        line = f.readline()
        while line:
            message = base64.b64decode(line.strip(b"\n"))
            rsakey = RSA.importKey(priv_key)
            cipher = Cipher_pkcs1_v1_5.new(rsakey)
            plaintext = cipher.decrypt(message, random_generator)
            with open(save_path, 'ab+') as w: #追加寫入
                w.write(plaintext)
            line = f.readline()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM