Python操作AD域服務器進行組織和用戶的查詢和添加


 

由於工作中有時候會遇到需要對AD域服務器進行批量添加用戶和組織的操作,平時都是通過bat批處理對csv文件中的用戶和組織進行操作添加,但是操作起來還是略麻煩,就想自己動手用Python代碼寫個更好操作的方式,隨便百度了下,還真的有相關的庫——ldap3,先寫點demo,后面再完善下吧。
基本操作方法:

from ldap3 import Server, Connection, ALL, NTLM

# 連接
server = Server('192.168.214.93', get_info=ALL)
conn = Connection(server, user='TEST\\administrator', password='Winhong1234@#test', auto_bind=True, authentication=NTLM)
print(server.info)

# 查詢
res = conn.search('dc=test,dc=csc,dc=com', '(objectclass=user)', attributes=['objectclass'])
print(conn.result)  # 查詢失敗的原因
print(conn.entries)  # 查詢到的數據

# 增加組織
res = conn.add('OU=python,OU=cibuser,DC=test,DC=csc,DC=com', object_class='OrganizationalUnit')
if res:
	print('增加組織成功!')
else:
	print('增加組織發生錯誤')
	if conn.result['description'] == 'entryAlreadyExists':
		print('--該組織已存在')

# 增加用戶
user01 = {
	'displayName' : 'python測試用戶01',  # 顯示名稱
	'userPrincipalName' : 'python_user_01@test.csc.com',  # 登錄名
	'userAccountControl': '544',  # 啟用賬號
	'sAMAccountName': 'python_user_01',  # 登錄名
	'pwdLastSet': -1  # 取消下次登錄修改密碼
}
res = conn.add('CN=python_user_01,OU=python,OU=cibuser,DC=test,DC=csc,DC=com', object_class='user',
      attributes=user01)
# attributes支持的字段可以通過server.schema.object_classes['user']獲取
print(res)
print(conn.result)
if res:
	print('增加用戶成功!')
else:
	print('增加用戶發生錯誤')
	if conn.result['description'] == 'entryAlreadyExists':
		print('--該用戶已存在')

下面是准備改寫成類:

 1 #!/usr/bin/env python
 2 # coding=UTF-8
 3 '''
 4 @Author: wjx
 5 @Description: AD域
 6 @Date: 2018-12-23 21:23:57
 7 @LastEditTime: 2019-03-28 23:46:56
 8 '''
 9 from ldap3 import Server, Connection, ALL, NTLM
10 
11 class Adoper():
12     '''
13     操作AD域的類
14     '''
15     def __init__(self, domain, ip, admin='administrator', pwd=None):
16         '''
17         domain: 域名,格式為:xxx.xxx.xxx
18         ip: ip地址,格式為:192.168.214.1
19         admin: 管理員賬號
20         pwd: 管理員密碼
21         '''
22         self.domain = domain
23         self.DC = ','.join(['DC=' + dc for dc in domain.split('.')]) # csc.com -> DC=csc,DC=com
24         self.pre = domain.split('.')[0].upper()  # 用戶登陸的前綴
25         self.ip = ip
26         self.admin = admin
27         self.pwd = pwd
28         self.server = Server(self.ip, get_info=ALL)
29         self.conn = Connection(self.server, user=self.pre+'\\'+self.admin, password=self.pwd, auto_bind=True, authentication=NTLM)
30 
31     def search(self, org):
32         '''
33         查詢組織下的用戶
34         org: 組織,格式為:aaa.bbb 即bbb組織下的aaa組織,不包含域地址
35         '''
36         att_list = ['displayName', 'userPrincipalName','userAccountControl','sAMAccountName','pwdLastSet']
37         org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC
38         res = self.conn.search(search_base=org_base,
39                                 search_filter='(objectclass=user)', # 查詢數據的類型
40                                 attributes=att_list,                 # 查詢數據的哪些屬性
41                                 paged_size=1000)                     # 一次查詢多少數據
42         if res:
43             for user in self.conn.entries:
44                 yield user['displayName']
45         else:
46             print('查詢失敗: ', self.conn.result['description'])
47             return None
48 
49     def add_org(self, org):
50         '''
51         增加組織
52         oorg: 組織,格式為:aaa.bbb 即bbb組織下的aaa組織,不包含域地址
53         '''
54         org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC
55         res = self.conn.add(org_base, object_class='OrganizationalUnit') # 成功返回True,失敗返回False
56         if res:
57             print(f'增加組織[ {org} ]成功!')
58         else:
59             print(f'增加組織[ {org} ]發生錯誤: ', self.conn.result['description'])
60 
61     def add_user(self, org, name, uid):
62         '''
63         增加用戶
64         org:增加到該組織下
65         name:顯示名稱
66         uid:賬號
67         '''
68         org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC
69         user_att = {
70             'displayName' : name,
71             'userPrincipalName' : uid + '@' + self.domain,  # uid@admin組成登錄名
72             'userAccountControl': '544',                      # 啟用賬號
73             'sAMAccountName': uid,
74             'pwdLastSet': -1                                  # 取消下次登錄需要修改密碼
75         }
76         res = self.conn.add(f'CN={uid},{org_base}', object_class='user',
77               attributes=user_att)
78         if res:
79             print(f'增加用戶[ {name} ]成功!')
80         else:
81             print(f'增加用戶[ {name} ]發生錯誤:', self.conn.result['description'])
82 
83 
84 if __name__ == '__main__':
85     ad93 = Adoper(domain='test.csc.com', ip='192.168.214.93', pwd='Winhong1234@#test')
86     for user in ad93.search('信息科技部.總行.cibuser'):
87         print(user)
88     ad93.add_org('python02.cibuser')
89     ad93.add_user('python02.cibuser', 'python03類用戶', 'python03')

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM