上文,我們簡單實現了 使用pyrad 搭建radius對接防火牆挑戰認證 但是挑戰碼只在控制台輸出就有點不太合適,我們接下來使用python對接釘釘,及時的把動態碼發給釘釘使用
不說廢話 上代碼
# -*- coding: UTF-8 -*- from pyrad import * import socket import pyrad.host import random import pymysql from dingtalk import * import requests import json BUFSIZE = 1024 KEY = b"testing123456789" CHALLENGE = "666" #挑戰碼初始為666 appkey='xxx'#釘釘的appkey appsecret='xxxx' #釘釘的appsecret 在釘釘的管理后台這兩個參數都能找到 class RadiusServer(pyrad.host.Host): def __init__(self): dict = pyrad.dictionary.Dictionary("dictionary") #從通用的字典使用 pyrad.host.Host.__init__(self, dict=dict) def get_challenge(self): #產生一個4字節的隨機挑戰碼 challenge = "" #初始化挑戰碼 challenge = challenge + str(chr(random.randint(65,90))) challenge = challenge + str(chr(random.randint(65,90))) challenge = challenge + str(chr(random.randint(65,90))) challenge = challenge + str(chr(random.randint(65,90))) return challenge def get_token(self): url = 'https://oapi.dingtalk.com/gettoken?appkey='+appkey+'&appsecret='+appsecret response = requests.get(url=url) result = response.json() errmsg = result['errmsg'] print('獲取token是否成功:',errmsg) try: access_token = result['access_token'] except Exception as e: print(e) access_token = '' return access_token def check_pass(self, radpkt): global CHALLENGE global access_token1 conn=pymysql.connect( host='xxxx', port=3306, user='root', password='xxxx', db='xxxx',#這都是數據庫的參數 按照你實際的情況來 charset='utf8' ) # 拿到游標 cursor=conn.cursor() user=radpkt["User-Name"][0] print(user) password=radpkt["User-Password"][0] print(password) # 執行sql語句 sql='select * from test where user = "%s" '% (user) res=cursor.execute(sql) pwd=cursor.fetchall() cursor.close() conn.close() if radpkt.PwCrypt(CHALLENGE) == radpkt["User-Password"][0]: radpkt.code = packet.AccessAccept CHALLENGE = self.get_challenge() #挑戰碼使用過后就更換掉 radpkt.AddAttribute("Reply-Message","PASSCODE Accept") print("AccessAccept") elif radpkt.PwCrypt(pwd[0][1]) == radpkt["User-Password"][0]: radpkt.code = packet.AccessChallenge CHALLENGE = self.get_challenge() radpkt.AddAttribute("Reply-Message","Enter Token Code") radpkt.AddAttribute("State",b'0x1122123231') #最好這個還是隨機生成,好吧我有點偷懶 #下面發送驗證碼 access_token1 = self.get_token() userid=pwd[0][2] print('釘釘id為:',pwd[0][2]) url = 'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token='+access_token1 content = '你好,VPN驗證碼為'+CHALLENGE _neirong = { 'agent_id':'xxxx',#應用id釘釘后台查看 'userid_list':userid, 'msg':{ 'msgtype':'text', 'text':{'content':content}} } neirong = json.dumps(_neirong) response = requests.post(url=url,data=neirong) result = response.json() print(result) else: radpkt.code = packet.AccessReject print("AccessReject") def get_pkt(self, pkt): get_pw = None get_name = None radpkt = self.CreateAuthPacket(packet=pkt) #解析請求報文 #radpkt.code = packet.AccessChallenge radpkt.secret = KEY print("輸出相關參數") for key in radpkt.keys(): print(key, radpkt[key]) if key == "User-Password": get_pw = 1 if key == "User-Name": get_name = 1 if 1 == get_pw and 1 == get_name: self.check_pass(radpkt) return radpkt.ReplyPacket() ip_port = ('', 1812) server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp協議 server.bind(ip_port) while True: data,client_addr = server.recvfrom(BUFSIZE) srv = RadiusServer() reply = srv.get_pkt(data) server.sendto(reply, client_addr)
不得不說一下 釘釘官方的sdk還是python 2 有點小坑 。但是手冊寫的很清楚,參考手冊還是能搞明白的。