微信開放平台_消息的加解密_WXBizMsgCrypt.py文件_python3修改版


微信開放平台鏈接地址:https://open.weixin.qq.com/cgi-bin/applist?t=manage/list&page=0&num=20&openapptype=2048&token=bda7aa0567c6b511f03ed0070a0e74c1dad3703a&lang=zh_CN

微信開放平台官方提供的消息加解密_WXBizMsgCrypt.py文件是python2 版本,所以對於用python3開發的來說需要做部分修改:以下就是WXBizMsgCrypt.py經過python3部分修改后的實戰代碼

python2 版本的WXBizMsgCrypt.py文件可以直接去微信開放平台點擊下載!!!!!!!!!!!!!!!!!!!!!!!!!

# 修改了部分代碼,原微信官方提供的加密解密代碼是python2版本,從python2->python3---->WXBizMsgCrypt.py文件

import base64
import string
import random
import hashlib
import time
import struct
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import socket
import wxopen.ierror as ierror

""" AES加解密用 pycrypto """

class FormatException(Exception):
pass

def throw_exception(message, exception_class=FormatException):
"""my define raise exception function"""
raise exception_class(message)

class SHA1:
"""計算公眾平台的消息簽名接口"""
def getSHA1(self, token, timestamp, nonce, encrypt):
"""用SHA1算法生成安全簽名
@param token: 票據
@param timestamp: 時間戳
@param encrypt: 密文
@param nonce: 隨機字符串
@return: 安全簽名
"""
try:
token = token.decode()
sortlist = [token, timestamp, nonce, encrypt]
sortlist.sort()
sha = hashlib.sha1()
sha.update("".join(sortlist).encode("utf8"))
return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_ComputeSignature_Error, None


class XMLParse(object):
"""提供提取消息格式中的密文及生成回復消息格式的接口"""

# xml消息模板
AES_TEXT_RESPONSE_TEMPLATE = """<xml><Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt><MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature><TimeStamp>%(timestamp)s</TimeStamp><Nonce><![CDATA[%(nonce)s]]></Nonce></xml>"""
def extract(self, xmltext):
"""提取出xml數據包中的加密消息
@param xmltext: 待提取的xml字符串
@return: 提取出的加密消息字符串
"""
try:
xml_tree = ET.fromstring(xmltext)
encrypt = xml_tree.find("Encrypt")
touser_name = xml_tree.find("ToUserName")
return ierror.WXBizMsgCrypt_OK, encrypt.text, touser_name.text
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_ParseXml_Error, None, None

def generate(self, encrypt, signature, timestamp, nonce):
"""生成xml消息
@param encrypt: 加密后的消息密文
@param signature: 安全簽名
@param timestamp: 時間戳
@param nonce: 隨機字符串
@return: 生成的xml字符串
"""
resp_dict = {
'msg_encrypt': encrypt,
'msg_signaturet': signature,
'timestamp': timestamp,
'nonce': nonce,
}
resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
return resp_xml


class PKCS7Encoder(object):
"""提供基於PKCS7算法的加解密接口"""

block_size = 32

def encode(self, text):
""" 對需要加密的明文進行填充補位
@param text: 需要進行填充補位操作的明文
@return: 補齊明文字符串
"""
text_length = len(text)
# 計算需要填充的位數
amount_to_pad = self.block_size - (text_length % self.block_size)
if amount_to_pad == 0:
amount_to_pad = self.block_size
# 獲得補位所用的字符
pad = chr(amount_to_pad).encode()
return text + pad * amount_to_pad

def decode(self, decrypted):
"""刪除解密后明文的補位字符
@param decrypted: 解密后的明文
@return: 刪除補位字符后的明文
"""
pad = ord(decrypted[-1])
if pad < 1 or pad > 32:
pad = 0
return decrypted[:-pad]


class Prpcrypt(object):
"""提供接收和推送給公眾平台消息的加解密接口"""

def __init__(self, key):
# self.key = base64.b64decode(key+"=")
self.key = key
# 設置加解密模式為AES的CBC模式
self.mode = AES.MODE_CBC

def encrypt(self, text, appid):
"""對明文進行加密
@param text: 需要加密的明文
@return: 加密得到的字符串
"""
# 16位隨機字符串添加到明文開頭
len_str = struct.pack("I", socket.htonl(len(text.encode())))
# text = self.get_random_str() + binascii.b2a_hex(len_str).decode() + text + appid
text = self.get_random_str() + len_str + text.encode() + appid
# 使用自定義的填充方式對明文進行補位填充
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# 加密
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# 使用BASE64對加密后的字符串進行編碼
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode('utf8')
except Exception as e:
return ierror.WXBizMsgCrypt_EncryptAES_Error, None

def decrypt(self, text, appid):
"""對解密后的明文進行補位刪除
@param text: 密文
@return: 刪除填充補位后的明文
"""
try:
cryptor = AES.new(self.key, self.mode, self.key[:16])
# 使用BASE64對密文進行解碼,然后AES-CBC解密
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_DecryptAES_Error, None
try:
# pad = ord(plain_text[-1])
pad = plain_text[-1]
# 去掉補位字符串
# pkcs7 = PKCS7Encoder()
# plain_text = pkcs7.encode(plain_text)
# 去除16位隨機字符串
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
xml_content = content[4: xml_len + 4]
from_appid = content[xml_len + 4:]
except Exception as e:
return ierror.WXBizMsgCrypt_IllegalBuffer, None
if from_appid != appid:
return ierror.WXBizMsgCrypt_ValidateAppid_Error, None
return 0, xml_content.decode()

def get_random_str(self):
""" 隨機生成16位字符串
@return: 16位字符串
"""
rule = string.ascii_letters + string.digits
str = random.sample(rule, 16)
return "".join(str).encode()


class WXBizMsgCrypt(object):
# 構造函數
# @param sToken: 公眾平台上,開發者設置的Token
# @param sEncodingAESKey: 公眾平台上,開發者設置的EncodingAESKey
# @param sAppId: 企業號的AppId
def __init__(self, sToken, sEncodingAESKey, sAppId):
try:
self.key = base64.b64decode(sEncodingAESKey + "=")
assert len(self.key) == 32
except Exception:
throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
# return ierror.WXBizMsgCrypt_IllegalAesKey)
self.token = sToken.encode()
self.appid = sAppId.encode()

def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
# 將公眾號回復用戶的消息加密打包
# @param sReplyMsg: 企業號待回復用戶的消息,xml格式的字符串
# @param sTimeStamp: 時間戳,可以自己生成,也可以用URL參數的timestamp,如為None則自動用當前時間
# @param sNonce: 隨機串,可以自己生成,也可以用URL參數的nonce
# sEncryptMsg: 加密后的可以直接回復用戶的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
# return:成功0,sEncryptMsg,失敗返回對應的錯誤碼None
pc = Prpcrypt(self.key)
ret, encrypt = pc.encrypt(sReplyMsg, self.appid)
if ret != 0:
return ret, None
if timestamp is None:
timestamp = str(int(time.time()))
# 生成安全簽名
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.token, timestamp, sNonce, encrypt)

if ret != 0:
return ret, None
xmlParse = XMLParse()
return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)
def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
sha1 = SHA1()
ret,signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, sEchoStr)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret,sReplyEchoStr = pc.decrypt(sEchoStr,self.appid)
return ret,sReplyEchoStr

def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
# 檢驗消息的真實性,並且獲取解密后的明文
# @param sMsgSignature: 簽名串,對應URL參數的msg_signature
# @param sTimeStamp: 時間戳,對應URL參數的timestamp
# @param sNonce: 隨機串,對應URL參數的nonce
# @param sPostData: 密文,對應POST請求的數據
# xml_content: 解密后的原文,當return返回0時有效
# @return: 成功0,失敗返回對應的錯誤碼
# 驗證安全簽名
xmlParse = XMLParse()
ret, encrypt,touser_name = xmlParse.extract(sPostData)
if ret != 0:
return ret, None
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, encrypt)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret, xml_content = pc.decrypt(encrypt, self.appid)
return ret, xml_content


# 以下是微信開放平台提供的ierror.py文件

  #!/usr/bin/env python
  # -*- coding: utf-8 -*-
  #########################################################################
  # Author: jonyqin
  # Created Time: Thu 11 Sep 2014 01:53:58 PM CST
  # File Name: ierror.py
  #Description:定義錯誤碼含義
  #########################################################################
  WXBizMsgCrypt_OK = 0
  WXBizMsgCrypt_ValidateSignature_Error = -40001
  WXBizMsgCrypt_ParseXml_Error = -40002
  WXBizMsgCrypt_ComputeSignature_Error = -40003
  WXBizMsgCrypt_IllegalAesKey = -40004
  WXBizMsgCrypt_ValidateAppid_Error = -40005
  WXBizMsgCrypt_EncryptAES_Error = -40006
  WXBizMsgCrypt_DecryptAES_Error = -40007
  WXBizMsgCrypt_IllegalBuffer = -40008
  WXBizMsgCrypt_EncodeBase64_Error = -40009
  WXBizMsgCrypt_DecodeBase64_Error = -40010

  






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM