radius挑戰認證+對接華為防火牆實現SSLvpn +釘釘 雙因子認證2


上文,我們簡單實現了 使用pyrad 搭建radius對接防火牆挑戰認證 但是挑戰碼只在控制台輸出就有點不太合適,我們接下來使用python對接釘釘,及時的把動態碼發給釘釘使用

不說廢話 上代碼

# -*- coding: UTF-8 -*-
from pyrad import *
import socket
import pyrad.host
import random
import pymysql
from dingtalk import *
import requests
import json
BUFSIZE = 1024
KEY = b"testing123456789"
CHALLENGE = "666" #挑戰碼初始為666
appkey='xxx'#釘釘的appkey
appsecret='xxxx' #釘釘的appsecret 在釘釘的管理后台這兩個參數都能找到
class RadiusServer(pyrad.host.Host):
    def __init__(self):
        dict = pyrad.dictionary.Dictionary("dictionary") #從通用的字典使用
        pyrad.host.Host.__init__(self, dict=dict)
    def get_challenge(self): #產生一個4字節的隨機挑戰碼
        challenge = ""  #初始化挑戰碼
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        return challenge
    def get_token(self):
        url = 'https://oapi.dingtalk.com/gettoken?appkey='+appkey+'&appsecret='+appsecret
        response = requests.get(url=url)
        result = response.json()
        errmsg = result['errmsg']
        print('獲取token是否成功:',errmsg)
        try:
            access_token = result['access_token']
        except Exception as e:
            print(e)
            access_token = ''
        return access_token                 
    def check_pass(self, radpkt): 
        global CHALLENGE
        global access_token1
        conn=pymysql.connect(
            host='xxxx',
            port=3306,   
            user='root',
            password='xxxx',
            db='xxxx',#這都是數據庫的參數 按照你實際的情況來
            charset='utf8'
        )
         # 拿到游標
        cursor=conn.cursor()
        user=radpkt["User-Name"][0]
        print(user)
        password=radpkt["User-Password"][0]
        print(password)
         # 執行sql語句 
        sql='select * from test where user = "%s" '% (user)
        res=cursor.execute(sql)
        pwd=cursor.fetchall()
        cursor.close()
        conn.close()     
        if radpkt.PwCrypt(CHALLENGE) == radpkt["User-Password"][0]:
            radpkt.code = packet.AccessAccept
            CHALLENGE = self.get_challenge() #挑戰碼使用過后就更換掉
            radpkt.AddAttribute("Reply-Message","PASSCODE Accept")
            print("AccessAccept")                
        elif radpkt.PwCrypt(pwd[0][1]) == radpkt["User-Password"][0]:
            radpkt.code = packet.AccessChallenge
            CHALLENGE = self.get_challenge()
            radpkt.AddAttribute("Reply-Message","Enter Token Code") 
            radpkt.AddAttribute("State",b'0x1122123231') #最好這個還是隨機生成,好吧我有點偷懶
            #下面發送驗證碼
            access_token1 = self.get_token()
            userid=pwd[0][2]
            print('釘釘id為:',pwd[0][2])
            url = 'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token='+access_token1
            content = '你好,VPN驗證碼為'+CHALLENGE
            _neirong = {
                     'agent_id':'xxxx',#應用id釘釘后台查看
                     'userid_list':userid,
                     'msg':{
                            'msgtype':'text',
                            'text':{'content':content}}
                        }
            neirong = json.dumps(_neirong)
            response = requests.post(url=url,data=neirong)
            result = response.json()
            print(result)            
        else:
            radpkt.code = packet.AccessReject
            print("AccessReject")
    def get_pkt(self, pkt):
        get_pw = None
        get_name = None
        radpkt = self.CreateAuthPacket(packet=pkt) #解析請求報文
        #radpkt.code = packet.AccessChallenge
        radpkt.secret = KEY
        print("輸出相關參數")
        for key in radpkt.keys():        
            print(key, radpkt[key])
            if key == "User-Password":
                get_pw = 1
            if key == "User-Name":
                get_name = 1       
        if 1 == get_pw and 1 == get_name:
            self.check_pass(radpkt)        
        return radpkt.ReplyPacket()         
ip_port = ('', 1812)
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # udp協議
server.bind(ip_port)


while True:
    data,client_addr = server.recvfrom(BUFSIZE)
    srv = RadiusServer()
    reply = srv.get_pkt(data)
    server.sendto(reply, client_addr)    

不得不說一下 釘釘官方的sdk還是python 2 有點小坑  。但是手冊寫的很清楚,參考手冊還是能搞明白的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM