前言
接上篇文章 [原創]django+ldap實現統一認證部分一(django-auth-ldap實踐) 繼續實現我們的統一認證
python-ldap
我在sso項目的backend/lib/common/下添加一個ldaphelper.py文件,其中定義一個類
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import ldap
import ldap.modlist as modlist
# 加載log配置
import logging
logger = logging.getLogger()
import sys
reload(sys)
sys.setdefaultencoding('utf8')
'''
實現LDAP用戶登錄驗證,首先獲取用戶的dn,然后再驗證用戶名和密碼
'''
from SSOadmin import settings
# 登陸 地址
LDAP_URI = settings.AUTH_LDAP_SERVER_URI
# 登陸 賬戶
LDAP_USER = settings.AUTH_LDAP_BIND_DN
# 登陸 密碼
LDAP_PASS = settings.AUTH_LDAP_BIND_PASSWORD
# 默認 區域
BASE_DN = settings.base_dn
class LDAPTool(object):
def __init__(self,
ldap_uri=None,
base_dn=None,
user=None,
password=None):
"""
初始化
:param ldap_uri: ldap_uri
:param base_dn: 區域
:param user: 默認用戶
:param password: 默認密碼
:return:
"""
if not ldap_uri:
ldap_uri = LDAP_URI
if not base_dn:
self.base_dn = BASE_DN
if not user:
self.admin_user = LDAP_USER
if not password:
self.admin_password = LDAP_PASS
try:
self.ldapconn = ldap.initialize(ldap_uri) # 老版本使用open方法
self.ldapconn.simple_bind(self.admin_user, self.admin_password) # 綁定用戶名、密碼
except ldap.LDAPError, e:
logger.error('ldap conn失敗,原因為: %s' % str(e))
def ldap_search_dn(self, value=None, value_type='uid'):
"""
# 根據表單提交的用戶名,檢索該用戶的dn,一條dn就相當於數據庫里的一條記錄。
# 在ldap里類似cn=username,ou=users,dc=gccmx,dc=cn,驗證用戶密碼,必須先檢索出該DN
:param value: 用戶 uid或 組cn
:param value_type: 用戶 uid|cn
:return: search result
"""
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None
if value_type == 'cn':
searchFilter = "cn=" + value
else:
searchFilter = "uid=" + value
try:
ldap_result_id = obj.search(
base=self.base_dn,
scope=searchScope,
filterstr=searchFilter,
attrlist=retrieveAttributes
)
result_type, result_data = obj.result(ldap_result_id, 0)
if result_type == ldap.RES_SEARCH_ENTRY:
return result_data
else:
return None
except ldap.LDAPError, e:
logger.error('ldap search %s 失敗,原因為: %s' % (value, str(e)))
def ldap_get_user(self, uid=None):
"""
通過查詢用戶uid,從ldap_search_dn進一步提取所需數據,search到的是全部信息
:param uid:
:return: {‘uid’:'zhangsan','mail':'zhangsan@xxx.com','cn':'張三'}
"""
result = None
try:
search = self.ldap_search_dn(value=uid, value_type=uid)
if search is None:
raise ldap.LDAPError('未查詢到相應 id')
for user in search:
if user[1]['uid'][0] == uid:
result = {
'uid': uid,
'mail': user[1]['mail'][0],
'cn': user[1]['cn'][0],
}
except Exception, e:
logger.error('獲取用戶%s 失敗,原因為: %s' % (uid, str(e)))
return result
def __ldap_getgid(self, cn="員工"):
"""
查詢 組cn對應的gid
:param cn: 組cn
:return: 對應cn的gidNumber
"""
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None
searchFilter = "cn=" + cn
try:
ldap_result_id = obj.search(
base="ou=Group,%s" % self.base_dn,
scope=searchScope,
filterstr=searchFilter,
attrlist=retrieveAttributes
)
result_type, result_data = obj.result(ldap_result_id, 0)
if result_type == ldap.RES_SEARCH_ENTRY:
return result_data[0][1].get('gidNumber')[0]
else:
return None
except ldap.LDAPError, e:
logger.error('獲取gid失敗,原因為: %s' % str(e))
def __get_max_uidNumber(self):
"""
查詢 當前最大的uid,這個是在添加用戶時,用於自增uid
:param: None
:return: max uidNumber
"""
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = ['uidNumber']
searchFilter = "uid=*"
try:
ldap_result = obj.search(
base="ou=People,%s" % self.base_dn,
scope=searchScope,
filterstr=searchFilter,
attrlist=retrieveAttributes
)
result_set = []
while True:
result_type, result_data = obj.result(ldap_result, 0)
if not result_data:
break
else:
if result_type == ldap.RES_SEARCH_ENTRY:
result_set.append(int(result_data[0][1].get('uidNumber')[0]))
return max(result_set) + 1
except ldap.LDAPError, e:
logger.error('獲取最大uid失敗,原因為: %s' % str(e))
def ldap_add_user(self, cn, mail, username, password):
"""
添加ldap用戶
:param cn: 中文名, mail: 郵箱, username: 用戶名, password: 密碼
:return: True/None
"""
result = None
try:
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
addDN = "uid=%s,ou=People,%s" % (username, BASE_DN)
attrs = {}
attrs['objectclass'] = ['top', 'person', 'inetOrgPerson', 'posixAccount', 'organizationalPerson']
attrs['cn'] = str(cn)
attrs['homeDirectory'] = str('/home/%s' % username)
attrs['loginShell'] = '/bin/bash'
attrs['mail'] = str(mail)
attrs['sn'] = str(username)
attrs['uid'] = str(username)
attrs['userPassword'] = str(password)
attrs['uidNumber'] = str(self.__get_max_uidNumber())
attrs['gidNumber'] = self.__ldap_getgid(cn='員工')
ldif = ldap.modlist.addModlist(attrs)
obj.add_s(addDN, ldif)
obj.unbind_s()
result = True
except ldap.LDAPError, e:
logger.error("生成用戶%s 失敗,原因為: %s" % (username, str(e)))
return result
def check_user_belong_to_group(self, uid, group_cn='員工'):
"""
查詢 用戶 是否歸屬於某個組
:param uid: 用戶uid , Ex: 'ssoadmin'
:param group_cn: 歸屬組cn , Ex: '黑名單'
:return: True|None
"""
result = None
try:
search = self.ldap_search_dn(value=group_cn, value_type='cn')
if search is None:
raise ldap.LDAPError('未查詢到相應 id')
member_list = search[0][1].get('memberUid', [])
if uid in member_list:
result = True
except ldap.LDAPError, e:
logger.error('獲取用戶%s與組%s關系失敗,原因為: %s' % (uid, group_cn, str(e)))
return result
def check_user_status(self, uid):
"""
驗證用戶狀態
:param uid: 用戶uid
:return: 200: 用戶可用
404: 用戶不存在
403: 用戶被禁用
"""
result = 404
try:
target_cn = self.ldap_get_user(uid=uid)
if target_cn is None: # 如未查到用戶,記錄日志,但不算錯誤,后邊有很多地方會驗證用戶是否存在
result = 404
logger.debug("%s uid未查詢到" % uid)
else:
if self.check_user_belong_to_group(uid=uid, group_cn='黑名單'):
result = 403
else:
result = 200
except ldap.LDAPError, e:
logger.error("%s 檢查用戶狀態失敗,原因為: %s" % (uid, str(e)))
return result
def ldap_update_password(self, uid, new_password):
"""
更新密碼
:param uid: 用戶uid,新password
:return: True|None
"""
result = None
try:
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
modifyDN = "uid=%s,ou=People,%s" % (uid, BASE_DN)
# 因為是更新密碼,如用passwd_s方法需要oldpassword,如果用下邊方法,是增加一個新密碼,而不是替換,而我們的需求是重置密碼
# old_password = {'userPassword': ''}
# new_password = {'userPassword': new_password}
# ldif = modlist.modifyModlist(old_password, new_password)
# obj = modlist.modifyModlist(modifyDN, ldif)
# 以下方法實現密碼替換的效果,第二個參數就是要替換的屬性名,可以變更其他屬性
obj.modify_s(modifyDN, [(ldap.MOD_REPLACE, 'userPassword', [str(new_password)])])
obj.unbind_s()
result = True
except ldap.LDAPError, e:
logger.error("%s 密碼更新失敗,原因為: %s" % (uid, str(e)))
return result
# for test
def main():
# print type(LDAPTool().get_max_uidNumber())
# print(LDAPTool().ldap_search_dn(value='qudong'))
# print(LDAPTool().check_user_belong_to_group('qudong', '黑名單'))
# print LDAPTool().ldap_get_user(uid='qudong')
# print(LDAPTool().check_user_and_email(uid='qudong', email='qudong@ssotest.com'))
# s=LDAPTool()
# print s.ldap_add_user('哈嘍2','test222@ssotest.com','test222','test222')
# print(s._LDAPTool__ldap_getgid('黑名單'))
# print(LDAPTool().ldap_update_password('qudong','111111'))
pass
if __name__ == '__main__':
main()
view調用方面,就沒什么太多說的了,就是把原先對本地數據庫的user表的增刪改查,移植到ldaphelper上即可,下邊以我的修改密碼的邏輯為例做以說明,其他的包括注冊、登錄都大同小異了。修改密碼還涉及到一個驗證碼發送和驗證的邏輯,可以用於參考
models.py,這里自定義了user表,參考Django admin定制化,User字段擴展[原創],自定義擴展user表,額外就多了一張驗證碼表
class MyUser(AbstractUser):
name = models.CharField(u'中文名', max_length=32, blank=False, null=False)
class Meta:
verbose_name = u'用戶詳情'
verbose_name_plural = u"用戶詳情"
class User_ex(models.Model):
"""User models ex"""
email = models.EmailField(unique=True, blank=False, null=False)
valid_code = models.CharField(max_length=24) #驗證碼
valid_time = models.DateTimeField(auto_now=True) #驗證碼有效時間
class Meta:
verbose_name = u'驗證碼'
verbose_name_plural = u"驗證碼"
def __unicode__(self):
return u'%s' % self.valid_code
獲取驗證碼的view,生成驗證碼,入庫,發送驗證碼郵件,發送郵件的方法send_mail在其他地方寫好了
from web_sso.forms.account import LoginForm, RegisterForm, ForgetPwdForm
from web_sso import models
from backend.lib.common.sendemail import send_mail
from backend.lib.common.ldaphelper import LDAPTool
import logging
......
def get_email_code(request):
"""get email code"""
email = request.GET.get('email', '')
type = request.GET.get('type', '')
code = ''.join(random.sample(string.digits + string.letters, 4))
data = {}
data['success'] = False
data['message'] = ''
try:
# 檢查郵箱
username = email.rsplit('@', 1)[0]
user_count = LDAPTool().check_user_status(uid=username)
# user_count = models.MyUser.objects.filter(email=email).count()
if type == "register":
if user_count != 404:
data['success'] = False
data['message'] = u'此用戶%s已被注冊過' % username
raise Exception(data['message'])
elif type == "forget_pwd":
if user_count == 404:
data['success'] = False
data['message'] = u'此郵箱未被注冊過'
raise Exception(data['message'])
# 檢查短時間內是否有生成過驗證碼
user_ex = models.User_ex.objects.filter(email=email)
if len(user_ex) > 0:
user_ex = user_ex[0]
# 兩個datetime相減,得到datetime.timedelta類型
create_time = user_ex.valid_time
td = timezone.now() - create_time
if td.seconds < 60:
data['message'] = u'1分鍾內發送過一次驗證碼'
raise Exception(data['message'])
# 發送郵件
subject = ""
if type == "register":
subject = u'[sa.ssotest.net]激活您的帳號'
elif type == "forget_pwd":
subject = u'[sa.ssotest.net]重置密碼'
message = u"""
<h2>運維平台(<a href='http://sa.ssotest.net/' target=_blank>sa.ssotest.net</a>)<h2><br />
<table border="1px" cellpadding="3" cellspacing="0">
<thead></thead>
<tr bgcolor="#d3d3d3">
<th>郵箱</th>
<th>驗證碼</th>
</tr>
<tbody>
<tr>
<td>%s</td>
<td>%s</td>
</tr>
</tbody>
</table>
<br/><span style="color: red;font-size: medium">請保管好您的驗證,有效期10分鍾</span>
""" % (email, code)
mail_status = send_mail(subject, message, email, email)
if mail_status == 200:
# 郵件發送成功才入庫驗證碼
models.User_ex.objects.filter(email=email).delete()
models.User_ex.objects.create(email=email, valid_code=code)
data['success'] = True
data['message'] = 'OK'
else:
data['success'] = False
data['message'] = "郵件發送失敗,錯誤碼:%s" % mail_status
raise Exception(data['message'])
except Exception:
pass
finally:
return HttpResponse(json.dumps(data), content_type="application/json")
驗證碼驗證成功后,通過ldaphelper修改ldap中的對應用戶密碼,我這邊注釋的部分就是未使用ldap,使用數據庫入庫的方式,可作參考
def forget_pwd(request):
error = ''
forgetpwd_form = ForgetPwdForm(request.POST)
try:
if request.method == 'POST':
check = request.POST.get('checkcode', None).lower()
email = request.POST.get('email', None)
if not email:
error = '請輸入郵箱.'
raise Exception('請輸入郵箱.')
username = email.rsplit('@', 1)[0]
email_obj = models.User_ex.objects.get(email=email)
email_check = email_obj.valid_code.lower()
# 10分鍾有效期
email_check_time = email_obj.valid_time
if (timezone.now() - email_check_time).seconds >= 600:
error = '驗證碼失效.'
elif check != email_check:
error = '驗證碼錯誤.'
else:
if not forgetpwd_form.is_valid():
error = '格式錯誤.'
else:
data = forgetpwd_form.clean()
ldap_obj = LDAPTool()
# if not models.MyUser.objects.filter(Q(username=username) | Q(email=email)):
if ldap_obj.check_user_status(uid=username) == 404:
error = '此郵箱未被注冊過.'
else: # 重置密碼成功
ldap_obj.ldap_update_password(uid=username, new_password=data.get('password', ''))
# user_obj = models.MyUser.objects.get(username=username,
# email=data.get('email', ''), )
# user_obj.set_password(data.get('password', ''))
# user_obj.save()
target = '%s?from=%s&username=%s' % ('/account/login/', 'forget_pwd', username)
return redirect(target)
except Exception, e:
logger.error(str(e))
return render(request, 'account/forget_pwd.html', {'model': forgetpwd_form, 'error': error})
綜上,我們已經可以和ldap結合的比較好了,其中django-auth-ldap實現的功能,一般像一些成熟的軟件,如csvn、jira、openvpn都可以通過配置來實現,python-ldap實現的部分是這個系統的核心所在,通過我們的平台對ldap進行操作,當然也可以使用ldap管理后台(我這邊用的lam)來實現類似功能。
后邊還有單點登錄的實現部分,請看后續文章。
[原創]django+ldap實現單點登錄(裝飾器和緩存)
參考資料
http://www.vpsee.com/2012/11/use-python-ldap-to-create-read-delete-upgrade-ldap-entries/
http://www.grotan.com/ldap/python-ldap-samples.html#bind