import datetime import socket import json import re from OpenSSL import SSL import logging """ pip install pyOpenSSL """ def choice2dict(choices): """ :param choices: [('a', '1'), ('b', '2'),] :return: """ trans = dict() for key, value in choices: if isinstance(key, bytes): key = key.decode() if isinstance(value, bytes): value = value.decode() trans[str(key)] = value return trans # 該類可以實現基本功能, 但優化空間非常大 class SSLCertificateAnalyzer(object): def __init__(self, host: str, port: int, domain: str, purpose='extranet'): self._result = {} self.host = host self.port = port self.domain = domain self.purpose = purpose self.sock = None def get_result(self): return self._result def doing(self): try: context = SSL.Context(SSL.SSLv23_METHOD) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(15) self.sock = SSL.Connection(context=context, socket=s) self.sock.connect((self.host, self.port)) self.sock.setblocking(1) self.sock.do_handshake() self.construct_result() except Exception as e: logging.error(e) self._result = { 'issuer_cn': '', 'cert_type': '', 'due_date': '', 'serial_number': '', 'subject_cn': '', 'has_expired': '', 'extension_subject_dns': '', 'cn_matched': '', 'cert_chain': '', 'check_all': 'Connect to the server failed' } def construct_result(self): x509 = self.sock.get_peer_certificate() result = self.analysis_certificate_x509(x509) cert_chain_json = self.get_peer_cert_chain() self._result.update(result) self._result['cert_chain'] = cert_chain_json if self._result['has_expired'] == 'Yes': self._result['check_all'] = 'Expired' else: self._result['check_all'] = 'Good' if not self._result['cert_chain']: self._result['check_all'] += ', No certificate chain' if self._result['cn_matched'] == 'No': self._result['check_all'] += ', CN not matched' if self._result['cert_type'] == 'self-signed' and self.purpose == 'extranet': self._result['check_all'] += ', self-signed' def get_peer_cert_chain(self): cert_chain = self.sock.get_peer_cert_chain() cert_chain_result = list() for x509 in cert_chain: result = self.analysis_certificate_x509(x509) cert_chain_result.append(result) return json.dumps(cert_chain_result) if cert_chain_result else '' def get_cn_matched(self, flag, subject_cn): if subject_cn: _convert = re.sub(r'\*\.', '.*?[.]', subject_cn) pattern = '(' + _convert + ')(.*)' if re.search(pattern, self.domain, re.I): flag = 'Yes' return flag def analysis_certificate_x509(self, x509): cn_matched = 'No' result = { 'issuer_cn': '', 'cert_type': '', 'due_date': '', 'serial_number': 'serial_number', 'subject_cn': '', 'has_expired': 'No', 'extension_subject_dns': '', 'cn_matched': cn_matched } try: issuer = choice2dict(x509.get_issuer().get_components()) issuer_cn = issuer['CN'] if 'CN' in issuer else '' if 'lenovo' in issuer_cn.lower() or 'local' in issuer_cn.lower(): cert_type = 'self-signed' else: cert_type = 'commercial' result['issuer_cn'] = issuer_cn result['cert_type'] = cert_type except Exception as e: logging.warning(e) try: subject = choice2dict(x509.get_subject().get_components()) result['subject_cn'] = subject['CN'] if 'CN' in subject else '' cn_matched = self.get_cn_matched(cn_matched, result['subject_cn']) except Exception as e: logging.warning(e) try: not_after = x509.get_notAfter() # due_date = datetime.datetime.strptime(not_after[:14], '%Y%m%d%H%M%S') due_date = datetime.datetime( year=int(not_after[0:4]), month=int(not_after[4:6]), day=int(not_after[6:8]), hour=int(not_after[8:10]), minute=int(not_after[10:12]), second=int(not_after[10:12]) ) if isinstance(due_date, datetime.datetime): result['due_date'] = due_date.strftime('%Y-%m-%d %H:%M:%S') if isinstance(due_date, datetime.date): result['due_date'] = due_date.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: pass result['cn_matched'] = 'No' result['has_expired'] = 'Yes' result['serial_number'] = '' try: x509_extension = x509.get_extension(0) x509_extension_short_name = x509_extension.get_short_name() if x509_extension_short_name == 'subjectAltName': try: x509_extension_subject_alt_name = x509_extension._subjectAltNameString() if ':' in x509_extension_subject_alt_name: extension_subject_dns = x509_extension_subject_alt_name.split(':')[1].strip() result['extension_subject_dns'] = extension_subject_dns cn_matched = self.get_cn_matched(cn_matched, extension_subject_dns) except Exception as e: logging.warning(e) result['serial_number'] = str(x509.get_serial_number()) result['has_expired'] = 'Yes' if x509.has_expired() else 'No' result['cn_matched'] = cn_matched except Exception as e: logging.error(e) return result if __name__ == '__main__': sca = SSLCertificateAnalyzer(host='118.112.225.33', port=443, domain='baidu.com', purpose='extranet') sca.doing() print(sca.get_result())