由於工作中有時候會遇到需要對AD域服務器進行批量添加用戶和組織的操作,平時都是通過bat批處理對csv文件中的用戶和組織進行操作添加,但是操作起來還是略麻煩,就想自己動手用Python代碼寫個更好操作的方式,隨便百度了下,還真的有相關的庫——ldap3,先寫點demo,后面再完善下吧。
基本操作方法:
from ldap3 import Server, Connection, ALL, NTLM
# 連接
server = Server('192.168.214.93', get_info=ALL)
conn = Connection(server, user='TEST\\administrator', password='Winhong1234@#test', auto_bind=True, authentication=NTLM)
print(server.info)
# 查詢
res = conn.search('dc=test,dc=csc,dc=com', '(objectclass=user)', attributes=['objectclass'])
print(conn.result) # 查詢失敗的原因
print(conn.entries) # 查詢到的數據
# 增加組織
res = conn.add('OU=python,OU=cibuser,DC=test,DC=csc,DC=com', object_class='OrganizationalUnit')
if res:
print('增加組織成功!')
else:
print('增加組織發生錯誤')
if conn.result['description'] == 'entryAlreadyExists':
print('--該組織已存在')
# 增加用戶
user01 = {
'displayName' : 'python測試用戶01', # 顯示名稱
'userPrincipalName' : 'python_user_01@test.csc.com', # 登錄名
'userAccountControl': '544', # 啟用賬號
'sAMAccountName': 'python_user_01', # 登錄名
'pwdLastSet': -1 # 取消下次登錄修改密碼
}
res = conn.add('CN=python_user_01,OU=python,OU=cibuser,DC=test,DC=csc,DC=com', object_class='user',
attributes=user01)
# attributes支持的字段可以通過server.schema.object_classes['user']獲取
print(res)
print(conn.result)
if res:
print('增加用戶成功!')
else:
print('增加用戶發生錯誤')
if conn.result['description'] == 'entryAlreadyExists':
print('--該用戶已存在')
下面是准備改寫成類:
1 #!/usr/bin/env python 2 # coding=UTF-8 3 ''' 4 @Author: wjx 5 @Description: AD域 6 @Date: 2018-12-23 21:23:57 7 @LastEditTime: 2019-03-28 23:46:56 8 ''' 9 from ldap3 import Server, Connection, ALL, NTLM 10 11 class Adoper(): 12 ''' 13 操作AD域的類 14 ''' 15 def __init__(self, domain, ip, admin='administrator', pwd=None): 16 ''' 17 domain: 域名,格式為:xxx.xxx.xxx 18 ip: ip地址,格式為:192.168.214.1 19 admin: 管理員賬號 20 pwd: 管理員密碼 21 ''' 22 self.domain = domain 23 self.DC = ','.join(['DC=' + dc for dc in domain.split('.')]) # csc.com -> DC=csc,DC=com 24 self.pre = domain.split('.')[0].upper() # 用戶登陸的前綴 25 self.ip = ip 26 self.admin = admin 27 self.pwd = pwd 28 self.server = Server(self.ip, get_info=ALL) 29 self.conn = Connection(self.server, user=self.pre+'\\'+self.admin, password=self.pwd, auto_bind=True, authentication=NTLM) 30 31 def search(self, org): 32 ''' 33 查詢組織下的用戶 34 org: 組織,格式為:aaa.bbb 即bbb組織下的aaa組織,不包含域地址 35 ''' 36 att_list = ['displayName', 'userPrincipalName','userAccountControl','sAMAccountName','pwdLastSet'] 37 org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC 38 res = self.conn.search(search_base=org_base, 39 search_filter='(objectclass=user)', # 查詢數據的類型 40 attributes=att_list, # 查詢數據的哪些屬性 41 paged_size=1000) # 一次查詢多少數據 42 if res: 43 for user in self.conn.entries: 44 yield user['displayName'] 45 else: 46 print('查詢失敗: ', self.conn.result['description']) 47 return None 48 49 def add_org(self, org): 50 ''' 51 增加組織 52 oorg: 組織,格式為:aaa.bbb 即bbb組織下的aaa組織,不包含域地址 53 ''' 54 org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC 55 res = self.conn.add(org_base, object_class='OrganizationalUnit') # 成功返回True,失敗返回False 56 if res: 57 print(f'增加組織[ {org} ]成功!') 58 else: 59 print(f'增加組織[ {org} ]發生錯誤: ', self.conn.result['description']) 60 61 def add_user(self, org, name, uid): 62 ''' 63 增加用戶 64 org:增加到該組織下 65 name:顯示名稱 66 uid:賬號 67 ''' 68 org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC 69 user_att = { 70 'displayName' : name, 71 'userPrincipalName' : uid + '@' + self.domain, # uid@admin組成登錄名 72 'userAccountControl': '544', # 啟用賬號 73 'sAMAccountName': uid, 74 'pwdLastSet': -1 # 取消下次登錄需要修改密碼 75 } 76 res = self.conn.add(f'CN={uid},{org_base}', object_class='user', 77 attributes=user_att) 78 if res: 79 print(f'增加用戶[ {name} ]成功!') 80 else: 81 print(f'增加用戶[ {name} ]發生錯誤:', self.conn.result['description']) 82 83 84 if __name__ == '__main__': 85 ad93 = Adoper(domain='test.csc.com', ip='192.168.214.93', pwd='Winhong1234@#test') 86 for user in ad93.search('信息科技部.總行.cibuser'): 87 print(user) 88 ad93.add_org('python02.cibuser') 89 ad93.add_user('python02.cibuser', 'python03類用戶', 'python03')