ldap3 官方文檔學習之增刪改查操作
前言
公司部門培訓用到 ldap3,布置了個作業,於是開始看官方文檔學習中。我是直接從 LDAP Operations 部分開始看的。
主要就是官方文檔提供了增刪改查的接口,需要看懂函數和參數,然后就會用了。
增加操作
官方 add 函數
def add(self,
dn,
object_class=None,
attributes=None,
controls=None)
逐個參數解釋:
-
dn:標識要添加的目標名字
-
object_class:要添加的標志類名稱,可以是包含一個單一值或一串字符串
-
attributes:一個以 {‘attr1’: ‘val1’, ‘attr2’: ‘val2’, …} or {‘attr1’: [‘val1’, ‘val2’, …], …} 多值形式的字典
-
controls:發送請求額外的信息
舉例
# import class and constants
from ldap3 import Server, Connection, ALL
# define the server
s = Server('servername', get_info=ALL) # define an unsecure LDAP server, requesting info on DSE and schema
# define the connection
c = Connection(s, user='user_dn', password='user_password')
# perform the Add operation
c.add('cn=user1,ou=users,o=company', ['inetOrgPerson', 'posixGroup', 'top'], {'sn': 'user_sn', 'gidNumber': 0})
# equivalent to 等同上面
c.add('cn=user1,ou=users,o=company', attributes={'objectClass': ['inetOrgPerson', 'posixGroup', 'top'], 'sn': 'user_sn', gidNumber: 0})
print(c.result)
# close the connection
c.unbind()
主要就是 add 函數傳三個參數:dn、object_class、attributes。
- dn 包含用戶cn、ou、o等信息
刪除操作
官方 delete 函數
def delete(self,
dn,
controls=None):
逐個參數解釋:
- dn:標識要刪除的目標名字
- controls:發送請求額外的信息
舉例
from ldap3 import Server, Connection, ALL
# define the server
s = Server('servername', get_info=ALL) # define an unsecure LDAP server, requesting info on DSE and schema
# define the connection
c = Connection(s, user='user_dn', password='user_password')
# perform the Delete operation
c.delete('cn=user1,ou=users,o=company')
print(c.result)
# close the connection
c.unbind()
主要就是 delete 函數傳一個參數:dn。
- dn 包含用戶cn、ou、o等信息
修改操作
官方 modify 函數
def modify(self,
dn,
changes,
controls=None):
逐個參數解釋:
- dn:標識要刪除的目標名字
- changes:一個要被展示在具體入口的修改的字典
- controls:發送請求額外的信息
舉例
# import class and constants
from ldap3 import Server, Connection, ALL, MODIFY_REPLACE
# define the server
s = Server('servername', get_info=ALL) # define an unsecure LDAP server, requesting info on DSE and schema
# define the connection
c = Connection(s, user='user_dn', password='user_password')
c.bind()
# perform the Modify operation
c.modify('cn=user1,ou=users,o=company',
{'givenName': [(MODIFY_REPLACE, ['givenname-1-replaced'])],
'sn': [(MODIFY_REPLACE, ['sn-replaced'])]})
print(c.result)
# close the connection
c.unbind()
主要就是 modify 函數傳兩個參數:dn、changes。
- dn 包含用戶cn、ou、o等信息
- changes 包含修改前的數據類別和修改后的數據
查詢操作
官方 search 函數
def search(self,
search_base,
search_filter,
search_scope=SUBTREE,
dereference_aliases=DEREF_ALWAYS,
attributes=None,
size_limit=0,
time_limit=0,
types_only=False,
get_operational_attributes=False,
controls=None,
paged_size=None,
paged_criticality=False,
paged_cookie=None):
逐個參數解釋:
- search_base:查詢請求的基礎
- search_filter:查詢請求的過濾器,必須服從 LDAP 過濾語法 RFC4515 標准
- search_scope:具體指定查詢內容的部分
- BASE:查詢 search_base 中指定的條目的屬性。
- LEVEL:查詢 search_base 中包含的條目的屬性。基對象必須引用一個容器對象。
- SUBTREE:向下查詢 search_base 和所有附屬容器中指定的條目的屬性。
- attributes:查詢返回的單個屬性或屬性列表(默認為None)。如果屬性為None,則不返回任何屬性。如果屬性是 ALL_ATTRIBUTES 或 ALL_OPERATIONAL_ATTRIBUTES,則返回所有用戶屬性或所有操作屬性。
舉例
from ldap3 import Server, Connection, SUBTREE
total_entries = 0
server = Server('test-server')
c = Connection(server, user='username', password='password')
c.search(search_base = 'o=test',
search_filter = '(objectClass=inetOrgPerson)',
search_scope = SUBTREE,
attributes = ['cn', 'givenName'],
paged_size = 5)
total_entries += len(c.response)
for entry in c.response:
print(entry['dn'], entry['attributes'])
cookie = c.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
while cookie:
c.search(search_base = 'o=test',
search_filter = '(objectClass=inetOrgPerson)',
search_scope = SUBTREE,
attributes = ['cn', 'givenName'],
paged_size = 5,
paged_cookie = cookie)
total_entries += len(c.response)
cookie = c.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
for entry in c.response:
print(entry['dn'], entry['attributes'])
print('Total entries retrieved:', total_entries)
主要就是 search 函數傳四個參數:search_base、search_filter、search_scope、attributes。
- search_base:一般就是之前三個操作都要用到的 dn
- search_filter:默認為 '(objectClass=inetOrgPerson)'
- search_scope:我用的是 SUBTREE
- attributes:我用的是 ALL_ATTRIBUTES
增刪改查完整版代碼
增刪改查操作都是從 Excel 表格中讀取數據的。
# coding: utf8
import sys
import json
from ldap3 import Connection, Server, ALL, MODIFY_ADD, MODIFY_REPLACE, SUBTREE, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
import utils
from config import BASE_DN, LDAP_TEST_CONFIG, LDAP_PROD_CONFIG, GROUP_DNS
host = "xxx.xxx.xxx.xxx"
port = xxx
user = "cn=xxx,dc=xx,dc=xxx"
password = "xxx"
# 創建連接
server = Server(host=host, port=port, get_info=ALL)
conn = Connection(server=server,
auto_bind=True,
read_only=False,
fast_decoder=True,
check_names=True,
user=user, password=password)
# 檢查連接是否成功
def test_connection():
print(server.info)
print(conn.user)
print(conn.extend.standard.who_am_i())
# 獲取用戶
def get_users():
conn.search(search_base="dc=xxx,dc=xxx", attributes=ALL_ATTRIBUTES,
search_filter='(objectclass=person)')
print(conn.result)
res = conn.response_to_json()
res = json.loads(res)['entries']
return res
print("===================測試鏈接====================")
test_connection()
print("\n\n\n===================打印用戶====================")
print(get_users())
ldap_config = {}
excel_file_path = "ldap-users-example.xlsx"
ldap_user_excel_file = excel_file_path
# 批量添加組織人員
def add_users():
# 讀取用戶excel信息
userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file)
# print(userattrs)
print("\n\n\n===================開始添加用戶========================")
for user_dn, userattr in userattrs.items():
# 添加用戶
conn.add(user_dn, ['top', 'inetOrgPerson', 'posixAccount'], userattr)
res = conn.result
# 用戶已存在
if res['result'] != 0:
msg = res['description']
print("add user failed:%s res: %s" % (user_dn, res))
continue
# 添加用戶到用戶組,直接添加到 cn = xx組中,沒有這個操作的話就是不添加到 cn 中
for GROUP_DN in GROUP_DNS:
conn.modify(GROUP_DN, {'uniqueMember': [(MODIFY_ADD, user_dn)]})
# print("====modify result=====")
res = conn.result
msg = "success"
if res['result'] != 0:
msg = res['message']
print("add user to group:%s res: %s" % (GROUP_DN, msg))
continue
print(user_dn, "success")
print("===================添加用戶完畢========================")
# 批量刪除組織人員
def delete_users():
# 讀取用戶excel信息
userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file)
# print(userattrs)
print("\n\n\n===================開始刪除用戶========================")
for user_dn, userattr in userattrs.items():
# 刪除用戶
conn.delete(user_dn)
res = conn.result
print(user_dn, "success")
print("===================刪除用戶完畢========================")
# 修改人員信息
def modify_users():
# 讀取用戶excel信息
userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file)
# print(userattrs)
print("\n\n\n===================開始修改用戶信息========================")
for user_dn, userattr in userattrs.items():
# 修改用戶部門和工作地點
conn.modify(user_dn, {'physicalDeliveryOfficeName': [(MODIFY_REPLACE, ['武漢研發組'])],
'l': [(MODIFY_REPLACE, ['武漢'])]})
res = conn.result
print(user_dn, "success")
print("===================修改用戶信息完畢========================")
# 查詢人員信息
def search_users():
# 讀取用戶excel信息
userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file)
# print(userattrs)
print("\n\n\n===================開始查詢用戶信息========================")
for user_dn, userattr in userattrs.items():
# 查詢用戶
print(user_dn)
status = conn.search(search_base = user_dn,
search_filter = '(objectClass=inetOrgPerson)',
search_scope = SUBTREE,
attributes = ALL_ATTRIBUTES)
if status:
print(user_dn, "success")
else:
print(user_dn, "failed")
print("===================查詢用戶信息完畢========================")
test_connection()
get_users()
search_users()
add_users()
delete_users()
modify_users()
search_users()
# close the connection
conn.unbind()