python使用ldap3進行接口調用


把自己使用到的ldap調用的代碼分享出來,希望大家可以參考

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
@Time    : 2019/11/14 5:37 PM
@Author  : NoSong
@File    : LdapBaseApi.py
@Software: PyCharm
# 接口文檔: https://ldap3.readthedocs.io/
# https://ldap3.readthedocs.io/tutorial_operations.html#
"""

ldap_config = {
    'host': "192.168.13.133",
    'port': 389,
    'base_dn': 'dc=domain,dc=com',
    'user': 'admin',
    'password': 'admin',
}

from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
from ldap3.core.exceptions import LDAPBindError
from ldap3 import MODIFY_REPLACE
from ldap3.utils.dn import safe_rdn
import sys
reload(sys)
sys.setdefaultencoding('utf8')

class LDAP(object):
    def __init__(self, host, port, user, password, base_dn):
        dn = "cn=%s,%s" % (user, base_dn)
        self.server = Server(host=host, port=port)
        self.base_dn = base_dn
        self.__conn = Connection(self.server, dn, password, auto_bind=True)

    def add_ou(self, ou, oid):
        """
        參考: https://ldap3.readthedocs.io/tutorial_operations.html#create-an-entry
        添加oy
        :param ou: 'ou=測試部,dc=domain,dc=com' 或者 'ou=測試子部門,ou=測試部,dc=domain,dc=com'
        :param oid: 部門id保存至st中
        :return:
        """

        return self.__conn.add(ou, 'organizationalUnit', {"st": oid})

    def add_user(self, userid, username, mobile, mail, title, ou_dn, gidnumber=501, alias=None):
        """
        參考: https://ldap3.readthedocs.io/tutorial_operations.html#create-an-entry
        :param userid:     "linan"
        :param username:   "姓名" cn=姓名
        :param mobile:
        :param mail:       "xxx@domain.com"
        :param title:
        :param ou_dn:     "ou=運維中心,dc=domain,dc=com"
        :param gidnumber: 501 默認用戶組
        :return:
        """
        l = self.__conn
        objectclass = ['top', 'person', 'inetOrgPerson', 'posixAccount']
        add_dn = "cn=%s,%s" % (username, ou_dn)

        # 也可以隨機生成,我先隨便寫一個值,這個需要自己定義規則
        password = '%s@qwe' % userid
        uidNumber = '%s' % userid.strip("xxx")
        # 添加用戶
        s = l.add(add_dn, objectclass, {'mobile': mobile,
                                   'sn': userid,
                                   'mail': mail,
                                   'userPassword': password,
                                   'title': title,
                                   'uid': username,
                                   'gidNumber': gidnumber,
                                   'uidNumber': uidNumber,
                                   'homeDirectory': '/home/users/%s' % userid,
                                   'loginShell': '/bin/bash'
                                   })
        return s

    def get_oudn_by_st(self, st, base_dn=None):
        """
        根據 st值 獲取組織dn
        參考: https://ldap3.readthedocs.io/tutorial_searches.html
        :param base_dn:
        :param st:  部門id
        :return: entry
        """
        if not base_dn:
            base_dn = self.base_dn
        # 查詢ou 中 返回的信息 attribute 包含 st
        status = self.__conn.search(base_dn, '(objectclass=organizationalUnit)', attributes=["st"])
        if status:
            flag = False
            for i in self.__conn.entries:
                if st:
                    if st in i.entry_attributes_as_dict["st"]:

                        return i
            else:
                return False
        else:
            return False

    def get_object_classes_info(self, objec_classes):
        """
        獲取 Ldap中 object_classes的必要參數以及其他信息
        參考: https://ldap3.readthedocs.io/tutorial_searches.html
        :param objec_classes: objec_classes
        :return:
        """
        print self.server.schema.object_classes[objec_classes]

    def get_userdn_by_mail(self, mail, base_dn=None):
        """
        通過郵箱地址,獲取用戶dn。部分沒有郵箱地址的用戶被忽略,不能使用ldap認證
        參考: https://ldap3.readthedocs.io/tutorial_searches.html
        :param mail:
        :param base_dn:
        :return:
        """
        if not base_dn:
            base_dn = self.base_dn
        status = self.__conn.search(base_dn,
                                    search_filter='(mail={})'.format(mail),
                                    search_scope=SUBTREE,
                                    attributes=ALL_ATTRIBUTES,
                                    )
        if status:
            flag = False
            for i in self.__conn.entries:
                # print(i.entry_dn)
                return i
            else:
                return False
        else:
            return False

    def get_userdn_by_args(self, base_dn=None, **kwargs):
        """
        參考: https://ldap3.readthedocs.io/tutorial_searches.html
        獲取用戶dn, 通過 args
        可以支持多個參數: get_userdn_by_args(mail="xxx@domain.com", uid="姓名")
        會根據 kwargs 生成 search的內容,進行查詢: 多個條件是 & and查詢
        返回第一個查詢到的結果,
        建議使用唯一標識符進行查詢
        這個函數基本可以獲取所有類型的數據
        :param base_dn:
        :param kwargs:
        :return:
        """
        search = ""
        for k, v in kwargs.items():
            search += "(%s=%s)" % (k, v)
        if not base_dn:
            base_dn = self.base_dn
        if search:
            search_filter = '(&{})'.format(search)
        else:
            search_filter = ''
        status = self.__conn.search(base_dn,
                                    search_filter=search_filter,
                                    search_scope=SUBTREE,
                                    attributes=ALL_ATTRIBUTES
                                    )

        if status:
            return self.__conn.entries
        else:
            return False

    def authenticate_userdn_by_mail(self, mail, password):
        """
        驗證用戶名密碼
        通過郵箱進行驗證密碼
        :param mail:
        :param password:
        :return:
        """

        entry = self.get_userdn_by_mail(mail=mail)

        if entry:
            bind_dn = entry.entry_dn
            try:
                Connection(self.server, bind_dn, password, auto_bind=True)
                return True
            except LDAPBindError:
                return False

        else:
            print("user: %s not exist! " % mail)
            return False

    def update_user_info(self, user_dn, action=MODIFY_REPLACE, **kwargs):
        """

        :param dn: 用戶dn 可以通過get_userdn_by_args,get_userdn_by_mail 獲取
        :param action: MODIFY_REPLACE 對字段原值進行替換  MODIFY_ADD 在指定字段上增加值   MODIFY_DELETE 對指定字段的值進行刪除
        :param kwargs: 要進行變更的信息內容 uid userPassword mail sn gidNumber uidNumber mobile title
        :return:
        """
        allow_key  = "uid userPassword mail sn gidNumber uidNumber mobile title".split(" ")
        update_args = {}
        for k, v in kwargs.items():
            if k not in allow_key:
                msg = "字段: %s, 不允許進行修改, 不生效" % k
                print(msg)
                return False
            update_args.update({k: [(action, [v])]})
        print(update_args)
        status = self.__conn.modify(user_dn, update_args)
        return status

    def update_user_cn(self, user_dn, new_cn):
        """
        修改cn

        dn: cn=用戶,ou=運維部,ou=研發中心,dc=domain,dc=com
        rdn就是 cn=用戶
        Example:
            from ldap3.utils.dn import safe_rdn
            safe_rdn('cn=b.smith,ou=moved,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org')
            [cn=b.smith]

        :param dn:
        :param new_cn:
        :return:
        """
        s = self.__conn.modify_dn(user_dn, 'cn=%s' % new_cn)
        return s

    def update_ou(self, dn, new_ou_dn):
        """
        更換所在的OU
        :param dn: 要進行變動的DN
        :param new_ou_dn:  新的OU DN
        :return:
        """
        rdn = safe_rdn(dn)
        print(rdn)
        s = self.__conn.modify_dn(dn, rdn[0], new_superior=new_ou_dn)
        return s

    def delete_dn(self, dn):
        """
        要進行刪除的DN
        :param dn:
        :return:
        """
        # 如果不是以cn開頭的需要清理(刪除) sub-link
        if not dn.startswith("cn"):
            # 獲取dn 下所有 sub Person DN 進行刪除
            allUserEntry = self.get_userdn_by_args(base_dn=dn, objectClass="Person")
            if allUserEntry:
                for userentry in allUserEntry:
                    self.__conn.delete(userentry.entry_dn)
                    print("deleting ou %s and delete sub Person DN: %s" % (dn, userentry.entry_dn))
            # 獲取dn 下所有 sub    OU進行刪除
            allOuEntry = self.get_userdn_by_args(base_dn=dn, objectClass="organizationalUnit")
            if allOuEntry:
                for ouEntry in reversed(allOuEntry):
                    s = self.__conn.delete(ouEntry.entry_dn)
                    print("deleting ou %s and delete sub organizationalUnit DN: %s" % (dn, ouEntry.entry_dn))
        else:
            s = self.__conn.delete(dn)
        # print(self.__conn.result)
        return s



if __name__ == '__main__':
    ldapObj = LDAP(**ldap_config)
    # 同步企業微信 組織架構 to Ldap
    # 同步企業微信 User  To ldap
    # -------------------------------
    # 刪除DN, 對DN下的 sub 進行遞歸刪除
    # s = ldapObj.get_oudn_by_st("1")
    # status = ldapObj.delete_dn(s.entry_dn)
    # print(status)
    # -------------------------------
    # 驗證用戶密碼
    # s = ldapObj.authenticate_userdn_by_mail("linan@domain.com", "xxx9999@qwe")
    # - -----------------------------
    # 添加用戶
    # s = ldapObj.add_user("xxx9999", "李南", "190283812", "linan@domain.com", "運維",
    #                  ou_dn="ou=運維中心,dc=domain,dc=com")
    # --------------------------------
    # 查詢 ou st  組id
    # s = obj.get_oudn_by_st(st="1")
    # --------------------------------
    # 添加OU
    # obj.add_ou("ou=總部,dc=domain,dc=com", 1)
    # obj.add_ou("ou=研發中心,ou=總部,dc=domain,dc=com", 2)
    # --------------------------------
    # 查詢用戶是否存在 - 通過 mail  獲取用戶 dn_entry
    # ldapObj.get_userdn_by_mail(mail="linan@domain.com")
    # --------------------------------
    # 根據 參數 查詢用戶DN   data = [dn_entry, ...] ,多個參數為 &
    # data = ldapObj.get_userdn_by_args(cn="李南",mail="xxxx")
    # --------------------------------
    # 對指定dn 進行參數修改  多個參數可以一起修改
    # s = ldapObj.update_user_info(data[0].entry_dn, userPassword="123456")
    # --------------------------------
    # 對指定DN 變更 OU-DN
    # s = ldapObj.update_user_ou(data[0].entry_dn, s.entry_dn)
    # --------------------------------
    # 對指定DN 修改CN名稱
    # ldapObj.update_cn(data[0].entry_dn,new_cn="李南男")
    # --------------------------------
    # 獲取objectClass 詳細信息
    # ldapObj.get_object_classes_info("organizationalUnit")
    # ldapObj.get_object_classes_info("posixAccount")
    # ldapObj.get_object_classes_info("inetOrgPerson")
    # ldapObj.get_object_classes_info("person")
    # 沒有郵箱地址的用戶:

    s = ldapObj.get_userdn_by_args(ou="研發中心")
    data= ldapObj.get_userdn_by_args(base_dn=s[0].entry_dn, objectclass = "organizationalUnit")
    for i in data:
        print(i.entry_dn)
    # print(s)



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM