[原創]django+ldap實現統一認證部分二(python-ldap實踐)


前言

接上篇文章 [原創]django+ldap實現統一認證部分一(django-auth-ldap實踐) 繼續實現我們的統一認證

python-ldap

我在sso項目的backend/lib/common/下添加一個ldaphelper.py文件,其中定義一個類

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import ldap
import ldap.modlist as modlist

# 加載log配置
import logging

logger = logging.getLogger()

import sys

reload(sys)
sys.setdefaultencoding('utf8')

'''
實現LDAP用戶登錄驗證,首先獲取用戶的dn,然后再驗證用戶名和密碼
'''
from SSOadmin import settings
# 登陸 地址
LDAP_URI = settings.AUTH_LDAP_SERVER_URI
# 登陸 賬戶
LDAP_USER = settings.AUTH_LDAP_BIND_DN
# 登陸 密碼
LDAP_PASS = settings.AUTH_LDAP_BIND_PASSWORD
# 默認 區域
BASE_DN = settings.base_dn


class LDAPTool(object):
    def __init__(self,
                 ldap_uri=None,
                 base_dn=None,
                 user=None,
                 password=None):
        """
        初始化
        :param ldap_uri: ldap_uri
        :param base_dn: 區域
        :param user: 默認用戶
        :param password: 默認密碼
        :return:
        """
        if not ldap_uri:
            ldap_uri = LDAP_URI
        if not base_dn:
            self.base_dn = BASE_DN
        if not user:
            self.admin_user = LDAP_USER
        if not password:
            self.admin_password = LDAP_PASS
        try:
            self.ldapconn = ldap.initialize(ldap_uri)  # 老版本使用open方法
            self.ldapconn.simple_bind(self.admin_user, self.admin_password)  # 綁定用戶名、密碼
        except ldap.LDAPError, e:
            logger.error('ldap conn失敗,原因為: %s' % str(e))

    def ldap_search_dn(self, value=None, value_type='uid'):
        """
        # 根據表單提交的用戶名,檢索該用戶的dn,一條dn就相當於數據庫里的一條記錄。
        # 在ldap里類似cn=username,ou=users,dc=gccmx,dc=cn,驗證用戶密碼,必須先檢索出該DN
        :param value: 用戶 uid或 組cn
        :param value_type: 用戶 uid|cn
        :return: search result
        """
        obj = self.ldapconn
        obj.protocal_version = ldap.VERSION3
        searchScope = ldap.SCOPE_SUBTREE
        retrieveAttributes = None
        if value_type == 'cn':
            searchFilter = "cn=" + value
        else:
            searchFilter = "uid=" + value
        try:
            ldap_result_id = obj.search(
                base=self.base_dn,
                scope=searchScope,
                filterstr=searchFilter,
                attrlist=retrieveAttributes
            )
            result_type, result_data = obj.result(ldap_result_id, 0)
            if result_type == ldap.RES_SEARCH_ENTRY:
                return result_data
            else:
                return None
        except ldap.LDAPError, e:
            logger.error('ldap search %s 失敗,原因為: %s' % (value, str(e)))

    def ldap_get_user(self, uid=None):
        """
        通過查詢用戶uid,從ldap_search_dn進一步提取所需數據,search到的是全部信息
        :param uid:
        :return: {‘uid’:'zhangsan','mail':'zhangsan@xxx.com','cn':'張三'}
        """
        result = None
        try:
            search = self.ldap_search_dn(value=uid, value_type=uid)
            if search is None:
                raise ldap.LDAPError('未查詢到相應 id')
            for user in search:
                if user[1]['uid'][0] == uid:
                    result = {
                        'uid': uid,
                        'mail': user[1]['mail'][0],
                        'cn': user[1]['cn'][0],
                    }
        except Exception, e:
            logger.error('獲取用戶%s 失敗,原因為: %s' % (uid, str(e)))
        return result

    def __ldap_getgid(self, cn="員工"):
        """
        查詢 組cn對應的gid
        :param cn: 組cn
        :return: 對應cn的gidNumber
        """
        obj = self.ldapconn
        obj.protocal_version = ldap.VERSION3
        searchScope = ldap.SCOPE_SUBTREE
        retrieveAttributes = None
        searchFilter = "cn=" + cn
        try:
            ldap_result_id = obj.search(
                base="ou=Group,%s" % self.base_dn,
                scope=searchScope,
                filterstr=searchFilter,
                attrlist=retrieveAttributes
            )
            result_type, result_data = obj.result(ldap_result_id, 0)
            if result_type == ldap.RES_SEARCH_ENTRY:
                return result_data[0][1].get('gidNumber')[0]
            else:
                return None
        except ldap.LDAPError, e:
            logger.error('獲取gid失敗,原因為: %s' % str(e))

    def __get_max_uidNumber(self):
        """
        查詢 當前最大的uid,這個是在添加用戶時,用於自增uid
        :param: None
        :return: max uidNumber
        """
        obj = self.ldapconn
        obj.protocal_version = ldap.VERSION3
        searchScope = ldap.SCOPE_SUBTREE
        retrieveAttributes = ['uidNumber']
        searchFilter = "uid=*"

        try:
            ldap_result = obj.search(
                base="ou=People,%s" % self.base_dn,
                scope=searchScope,
                filterstr=searchFilter,
                attrlist=retrieveAttributes
            )
            result_set = []
            while True:
                result_type, result_data = obj.result(ldap_result, 0)
                if not result_data:
                    break
                else:
                    if result_type == ldap.RES_SEARCH_ENTRY:
                        result_set.append(int(result_data[0][1].get('uidNumber')[0]))
            return max(result_set) + 1
        except ldap.LDAPError, e:
            logger.error('獲取最大uid失敗,原因為: %s' % str(e))

    def ldap_add_user(self, cn, mail, username, password):
        """
        添加ldap用戶
        :param cn: 中文名, mail: 郵箱, username: 用戶名, password: 密碼
        :return: True/None
        """
        result = None
        try:
            obj = self.ldapconn
            obj.protocal_version = ldap.VERSION3

            addDN = "uid=%s,ou=People,%s" % (username, BASE_DN)
            attrs = {}
            attrs['objectclass'] = ['top', 'person', 'inetOrgPerson', 'posixAccount', 'organizationalPerson']
            attrs['cn'] = str(cn)
            attrs['homeDirectory'] = str('/home/%s' % username)
            attrs['loginShell'] = '/bin/bash'
            attrs['mail'] = str(mail)
            attrs['sn'] = str(username)
            attrs['uid'] = str(username)
            attrs['userPassword'] = str(password)
            attrs['uidNumber'] = str(self.__get_max_uidNumber())
            attrs['gidNumber'] = self.__ldap_getgid(cn='員工')
            ldif = ldap.modlist.addModlist(attrs)
            obj.add_s(addDN, ldif)
            obj.unbind_s()
            result = True
        except ldap.LDAPError, e:
            logger.error("生成用戶%s 失敗,原因為: %s" % (username, str(e)))
        return result

    def check_user_belong_to_group(self, uid, group_cn='員工'):
        """
        查詢 用戶 是否歸屬於某個組
        :param uid: 用戶uid , Ex: 'ssoadmin'
        :param group_cn: 歸屬組cn , Ex: '黑名單'
        :return: True|None
        """
        result = None
        try:
            search = self.ldap_search_dn(value=group_cn, value_type='cn')
            if search is None:
                raise ldap.LDAPError('未查詢到相應 id')

            member_list = search[0][1].get('memberUid', [])
            if uid in member_list:
                result = True
        except ldap.LDAPError, e:
            logger.error('獲取用戶%s與組%s關系失敗,原因為: %s' % (uid, group_cn, str(e)))
        return result

    def check_user_status(self, uid):
        """
        驗證用戶狀態
        :param uid: 用戶uid
        :return: 200: 用戶可用
                 404: 用戶不存在
                 403: 用戶被禁用
        """
        result = 404
        try:
            target_cn = self.ldap_get_user(uid=uid)
            if target_cn is None:  # 如未查到用戶,記錄日志,但不算錯誤,后邊有很多地方會驗證用戶是否存在
                result = 404
                logger.debug("%s uid未查詢到" % uid)
            else:
                if self.check_user_belong_to_group(uid=uid, group_cn='黑名單'):
                    result = 403
                else:
                    result = 200
        except ldap.LDAPError, e:
            logger.error("%s 檢查用戶狀態失敗,原因為: %s" % (uid, str(e)))
        return result

    def ldap_update_password(self, uid, new_password):
        """
        更新密碼
        :param uid: 用戶uid,新password
        :return: True|None
        """
        result = None
        try:
            obj = self.ldapconn
            obj.protocal_version = ldap.VERSION3
            modifyDN = "uid=%s,ou=People,%s" % (uid, BASE_DN)
            # 因為是更新密碼,如用passwd_s方法需要oldpassword,如果用下邊方法,是增加一個新密碼,而不是替換,而我們的需求是重置密碼
            # old_password = {'userPassword': ''}
            # new_password = {'userPassword': new_password}
            # ldif = modlist.modifyModlist(old_password, new_password)
            # obj = modlist.modifyModlist(modifyDN, ldif)
            # 以下方法實現密碼替換的效果,第二個參數就是要替換的屬性名,可以變更其他屬性
            obj.modify_s(modifyDN, [(ldap.MOD_REPLACE, 'userPassword', [str(new_password)])])
            obj.unbind_s()
            result = True
        except ldap.LDAPError, e:
            logger.error("%s 密碼更新失敗,原因為: %s" % (uid, str(e)))
        return result

# for test
def main():
    # print type(LDAPTool().get_max_uidNumber())
    # print(LDAPTool().ldap_search_dn(value='qudong'))
    # print(LDAPTool().check_user_belong_to_group('qudong', '黑名單'))
    # print LDAPTool().ldap_get_user(uid='qudong')
    # print(LDAPTool().check_user_and_email(uid='qudong', email='qudong@ssotest.com'))
    # s=LDAPTool()
    # print s.ldap_add_user('哈嘍2','test222@ssotest.com','test222','test222')
    # print(s._LDAPTool__ldap_getgid('黑名單'))
    # print(LDAPTool().ldap_update_password('qudong','111111'))
    pass

if __name__ == '__main__':
    main()

view調用方面,就沒什么太多說的了,就是把原先對本地數據庫的user表的增刪改查,移植到ldaphelper上即可,下邊以我的修改密碼的邏輯為例做以說明,其他的包括注冊、登錄都大同小異了。修改密碼還涉及到一個驗證碼發送和驗證的邏輯,可以用於參考
models.py,這里自定義了user表,參考Django admin定制化,User字段擴展[原創],自定義擴展user表,額外就多了一張驗證碼表

class MyUser(AbstractUser):
    name = models.CharField(u'中文名', max_length=32, blank=False, null=False)

    class Meta:
        verbose_name = u'用戶詳情'
        verbose_name_plural = u"用戶詳情"


class User_ex(models.Model):
    """User models ex"""
    email = models.EmailField(unique=True, blank=False, null=False)
    valid_code = models.CharField(max_length=24)   #驗證碼
    valid_time = models.DateTimeField(auto_now=True)   #驗證碼有效時間

    class Meta:
        verbose_name = u'驗證碼'
        verbose_name_plural = u"驗證碼"

    def __unicode__(self):
        return u'%s' % self.valid_code

獲取驗證碼的view,生成驗證碼,入庫,發送驗證碼郵件,發送郵件的方法send_mail在其他地方寫好了

from web_sso.forms.account import LoginForm, RegisterForm, ForgetPwdForm
from web_sso import models
from backend.lib.common.sendemail import send_mail
from backend.lib.common.ldaphelper import LDAPTool
import logging

......

def get_email_code(request):
    """get email code"""
    email = request.GET.get('email', '')
    type = request.GET.get('type', '')
    code = ''.join(random.sample(string.digits + string.letters, 4))
    data = {}
    data['success'] = False
    data['message'] = ''

    try:
        # 檢查郵箱
        username = email.rsplit('@', 1)[0]
        user_count = LDAPTool().check_user_status(uid=username)
        # user_count = models.MyUser.objects.filter(email=email).count()
        if type == "register":
            if user_count != 404:
                data['success'] = False
                data['message'] = u'此用戶%s已被注冊過' % username
                raise Exception(data['message'])
        elif type == "forget_pwd":
            if user_count == 404:
                data['success'] = False
                data['message'] = u'此郵箱未被注冊過'
                raise Exception(data['message'])
        # 檢查短時間內是否有生成過驗證碼
        user_ex = models.User_ex.objects.filter(email=email)
        if len(user_ex) > 0:
            user_ex = user_ex[0]

            # 兩個datetime相減,得到datetime.timedelta類型
            create_time = user_ex.valid_time
            td = timezone.now() - create_time
            if td.seconds < 60:
                data['message'] = u'1分鍾內發送過一次驗證碼'
                raise Exception(data['message'])

        # 發送郵件
        subject = ""
        if type == "register":
            subject = u'[sa.ssotest.net]激活您的帳號'
        elif type == "forget_pwd":
            subject = u'[sa.ssotest.net]重置密碼'
        message = u"""
            <h2>運維平台(<a href='http://sa.ssotest.net/' target=_blank>sa.ssotest.net</a>)<h2><br />
<table border="1px" cellpadding="3" cellspacing="0">
    <thead></thead>
        <tr bgcolor="#d3d3d3">
            <th>郵箱</th>
            <th>驗證碼</th>
        </tr>
    <tbody>
        <tr>
            <td>%s</td>
            <td>%s</td>
        </tr>
    </tbody>
</table>
            <br/><span style="color: red;font-size: medium">請保管好您的驗證,有效期10分鍾</span>
            """ % (email, code)

        mail_status = send_mail(subject, message, email, email)
        if mail_status == 200:
            # 郵件發送成功才入庫驗證碼
            models.User_ex.objects.filter(email=email).delete()
            models.User_ex.objects.create(email=email, valid_code=code)
            data['success'] = True
            data['message'] = 'OK'
        else:
            data['success'] = False
            data['message'] = "郵件發送失敗,錯誤碼:%s" % mail_status
            raise Exception(data['message'])
    except Exception:
        pass
    finally:
        return HttpResponse(json.dumps(data), content_type="application/json")

驗證碼驗證成功后,通過ldaphelper修改ldap中的對應用戶密碼,我這邊注釋的部分就是未使用ldap,使用數據庫入庫的方式,可作參考

def forget_pwd(request):
    error = ''
    forgetpwd_form = ForgetPwdForm(request.POST)
    try:
        if request.method == 'POST':
            check = request.POST.get('checkcode', None).lower()
            email = request.POST.get('email', None)
            if not email:
                error = '請輸入郵箱.'
                raise Exception('請輸入郵箱.')
            username = email.rsplit('@', 1)[0]
            email_obj = models.User_ex.objects.get(email=email)
            email_check = email_obj.valid_code.lower()
            # 10分鍾有效期
            email_check_time = email_obj.valid_time
            if (timezone.now() - email_check_time).seconds >= 600:
                error = '驗證碼失效.'
            elif check != email_check:
                error = '驗證碼錯誤.'
            else:
                if not forgetpwd_form.is_valid():
                    error = '格式錯誤.'
                else:
                    data = forgetpwd_form.clean()
                    ldap_obj = LDAPTool()
                    # if not models.MyUser.objects.filter(Q(username=username) | Q(email=email)):
                    if ldap_obj.check_user_status(uid=username) == 404:
                        error = '此郵箱未被注冊過.'
                    else:  # 重置密碼成功
                        ldap_obj.ldap_update_password(uid=username, new_password=data.get('password', ''))
                        # user_obj = models.MyUser.objects.get(username=username,
                        #                                      email=data.get('email', ''), )
                        # user_obj.set_password(data.get('password', ''))
                        # user_obj.save()
                        target = '%s?from=%s&username=%s' % ('/account/login/', 'forget_pwd', username)
                        return redirect(target)
    except Exception, e:
        logger.error(str(e))
    return render(request, 'account/forget_pwd.html', {'model': forgetpwd_form, 'error': error})

綜上,我們已經可以和ldap結合的比較好了,其中django-auth-ldap實現的功能,一般像一些成熟的軟件,如csvn、jira、openvpn都可以通過配置來實現,python-ldap實現的部分是這個系統的核心所在,通過我們的平台對ldap進行操作,當然也可以使用ldap管理后台(我這邊用的lam)來實現類似功能。
后邊還有單點登錄的實現部分,請看后續文章。
[原創]django+ldap實現單點登錄(裝飾器和緩存)

參考資料

http://www.vpsee.com/2012/11/use-python-ldap-to-create-read-delete-upgrade-ldap-entries/
http://www.grotan.com/ldap/python-ldap-samples.html#bind


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM