ldap3:AD账号查询相关操作


import ldap3

AD_USER_PROPERTIES = [
    'userPrincipalName', 'department', 'sn', 'physicalDeliveryOfficeName', 'msExchOWAPolicy', 'lastLogonTimestamp',
    'mailNickname', 'msExchMailboxAuditLastDelegateAccess', 'l', 'telephoneNumber', 'publicDelegates',
    'facsimileTelephoneNumber', 'displayName', 'accountExpires', 'msExchLitigationHoldDate',
    'msExchWhenMailboxCreated', 'sAMAccountName', 'objectCategory', 'givenName', 'targetAddress', 'badPasswordTime',
    'c', 'mobile', 'swareIPAddresses', 'msExchLitigationHoldOwner', 'objectClass', 'departmentNumber', 'otherPager',
    'memberOf', 'distinguishedName', 'logonCount', 'manager', 'title', 'msExchMailboxAuditLastAdminAccess', 'mail',
    'pwdLastSet', 'employeeNumber', 'publicDelegatesBL', 'name', 'msRTCSIP-PrimaryUserAddress', 'employeeType',
    'showInAddressBook', 'swareLastLogin', 'cn', 'proxyAddresses ', 'whenCreated', 'postalCode', 'sAMAccountType',
    'Group', 'whenChanged'
]


# DataConverter 类见 https://www.cnblogs.com/luoyj2/articles/14241001.html

class LDAPManager(object):
    def __init__(self, host: str, username: str, password: str):
        self.host = host
        self.username = username
        self.password = password

    def search_info(self, cn: str):
        result = {
            'success': False,
            'dn': '',
            'attributes': {}
        }
        try:
            with ldap3.Connection(
                    ldap3.Server(self.host, get_info=ldap3.ALL),
                    user=self.username, password=self.password,
            ) as conn:
                try:
                    conn.search(
                        search_base='DC=lenovo,DC=com',
                        search_filter=f'(CN={cn})',
                        attributes=[ldap3.ALL_ATTRIBUTES]
                    )
                    if not conn.response:
                        raise Exception(f'"{cn}" not found.')
                    response = conn.response[0]
                    dn = response.get('raw_dn')
                    if isinstance(dn, bytes):
                        dn = dn.decode()

                    result.update(dn=dn)
                    attributes = {}
                    if 'OU=Groups' in dn:
                        attributes.update(cnType='group')
                        for k, v in response.get('attributes').items():
                            if k in [
                                'dSCorePropagationData', 'sIDHistory'
                            ]:
                                continue
                            attributes[k] = DataConverter(v).convert()
                    else:
                        attributes.update(cnType='user')
                        for k, v in response.get('attributes').items():
                            if k in AD_USER_PROPERTIES:
                                attributes[k] = DataConverter(v).convert()
                    result.update(success=True, attributes=attributes)
                except Exception as e:
                    raise e
        except Exception as e:
            result.update(success=False, error=str(e))
        return result

    def add_users_to_groups(self, members_dn, groups_dn):
        try:
            with ldap3.Connection(
                    server=ldap3.Server(self.host, get_info=ldap3.ALL),
                    user=self.username, password=self.password
            ) as conn:
                conn.start_tls()
                success = conn.extend.microsoft.add_members_to_groups(members_dn, groups_dn)
                return {'success': success}
        except Exception as e:
            return {'success': False, 'error': str(e)}

    def remove_users_from_groups(self, members_dn, groups_dn):
        try:
            with ldap3.Connection(
                    server=ldap3.Server(self.host, get_info=ldap3.ALL),
                    user=self.username, password=self.password
            ) as conn:
                conn.start_tls()
                success = conn.extend.microsoft.remove_members_from_groups(members_dn, groups_dn)
                return {'success': success}
        except Exception as e:
            return {'success': False, 'error': str(e)}

    def modify_password(self, it_code_or_dn, new_pass):
        try:
            with ldap3.Connection(
                    server=ldap3.Server(host=self.host, get_info=ldap3.ALL),
                    user=self.username, password=self.password
            ) as conn:
                conn.start_tls()
                if 'OU=' not in it_code_or_dn:
                    conn.search(
                        search_base='OU=User Accounts,DC=lenovo,DC=com',
                        search_filter='(sAMAccountName=%s)' % it_code_or_dn
                    )
                    if not conn.response:
                        return {'success': False, 'error': f'"{it_code_or_dn}" not found'}
                    user_dn = conn.response[0].get('dn')
                else:
                    user_dn = it_code_or_dn
                success = conn.extend.microsoft.modify_password(user_dn, new_pass)
                return {'success': success}
        except Exception as e:
            return {'success': False, 'error': str(e)}


if __name__ == '__main__':
    import json

    ad = LDAPManager(host='10.10.10.10', username='username', password='password')
    res = ad.search_info('test')
    print(json.dumps(res, indent=4))

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM