import ldap3 AD_USER_PROPERTIES = [ 'userPrincipalName', 'department', 'sn', 'physicalDeliveryOfficeName', 'msExchOWAPolicy', 'lastLogonTimestamp', 'mailNickname', 'msExchMailboxAuditLastDelegateAccess', 'l', 'telephoneNumber', 'publicDelegates', 'facsimileTelephoneNumber', 'displayName', 'accountExpires', 'msExchLitigationHoldDate', 'msExchWhenMailboxCreated', 'sAMAccountName', 'objectCategory', 'givenName', 'targetAddress', 'badPasswordTime', 'c', 'mobile', 'swareIPAddresses', 'msExchLitigationHoldOwner', 'objectClass', 'departmentNumber', 'otherPager', 'memberOf', 'distinguishedName', 'logonCount', 'manager', 'title', 'msExchMailboxAuditLastAdminAccess', 'mail', 'pwdLastSet', 'employeeNumber', 'publicDelegatesBL', 'name', 'msRTCSIP-PrimaryUserAddress', 'employeeType', 'showInAddressBook', 'swareLastLogin', 'cn', 'proxyAddresses ', 'whenCreated', 'postalCode', 'sAMAccountType', 'Group', 'whenChanged' ] # DataConverter 类见 https://www.cnblogs.com/luoyj2/articles/14241001.html class LDAPManager(object): def __init__(self, host: str, username: str, password: str): self.host = host self.username = username self.password = password def search_info(self, cn: str): result = { 'success': False, 'dn': '', 'attributes': {} } try: with ldap3.Connection( ldap3.Server(self.host, get_info=ldap3.ALL), user=self.username, password=self.password, ) as conn: try: conn.search( search_base='DC=lenovo,DC=com', search_filter=f'(CN={cn})', attributes=[ldap3.ALL_ATTRIBUTES] ) if not conn.response: raise Exception(f'"{cn}" not found.') response = conn.response[0] dn = response.get('raw_dn') if isinstance(dn, bytes): dn = dn.decode() result.update(dn=dn) attributes = {} if 'OU=Groups' in dn: attributes.update(cnType='group') for k, v in response.get('attributes').items(): if k in [ 'dSCorePropagationData', 'sIDHistory' ]: continue attributes[k] = DataConverter(v).convert() else: attributes.update(cnType='user') for k, v in response.get('attributes').items(): if k in AD_USER_PROPERTIES: attributes[k] = DataConverter(v).convert() result.update(success=True, attributes=attributes) except Exception as e: raise e except Exception as e: result.update(success=False, error=str(e)) return result def add_users_to_groups(self, members_dn, groups_dn): try: with ldap3.Connection( server=ldap3.Server(self.host, get_info=ldap3.ALL), user=self.username, password=self.password ) as conn: conn.start_tls() success = conn.extend.microsoft.add_members_to_groups(members_dn, groups_dn) return {'success': success} except Exception as e: return {'success': False, 'error': str(e)} def remove_users_from_groups(self, members_dn, groups_dn): try: with ldap3.Connection( server=ldap3.Server(self.host, get_info=ldap3.ALL), user=self.username, password=self.password ) as conn: conn.start_tls() success = conn.extend.microsoft.remove_members_from_groups(members_dn, groups_dn) return {'success': success} except Exception as e: return {'success': False, 'error': str(e)} def modify_password(self, it_code_or_dn, new_pass): try: with ldap3.Connection( server=ldap3.Server(host=self.host, get_info=ldap3.ALL), user=self.username, password=self.password ) as conn: conn.start_tls() if 'OU=' not in it_code_or_dn: conn.search( search_base='OU=User Accounts,DC=lenovo,DC=com', search_filter='(sAMAccountName=%s)' % it_code_or_dn ) if not conn.response: return {'success': False, 'error': f'"{it_code_or_dn}" not found'} user_dn = conn.response[0].get('dn') else: user_dn = it_code_or_dn success = conn.extend.microsoft.modify_password(user_dn, new_pass) return {'success': success} except Exception as e: return {'success': False, 'error': str(e)} if __name__ == '__main__': import json ad = LDAPManager(host='10.10.10.10', username='username', password='password') res = ad.search_info('test') print(json.dumps(res, indent=4))