一. 简介
通过elasticsearch-dsl模块来操作es。
版本:通过pip安装即可
elasticsearch==7.9.1
elasticsearch-dsl==7.3.0
1.1 建立es连接
es的配置文件,可以根据需求,进行ssl安全连接,这里没有启用。
WARN_LOG_INDEX = "ys-log" WARN_EVENT_INDEX = "ys-event" ATTACKER_INDEX = "ys-attacker" ORG_INDEX = "ys-org" VUL_NAME = "vulnerabilitys" PATCH_NAME = "patchs" EXPLOIT_NAME = "exploit" ICS_NAME = "ics" VUL_DRAW_NAME = "vul_draw_name" # es 集群设置 ES_HOST_LIST = ["192.168.99.215"] ES_PROTOCAL = "https" ES_PORT = 9200 # 查看es-docker-compose\certificates\.env ES_UESR = "elastic" ES_PWD = "d@#%1saE2sadc" # # 复制 es-docker-compose\certificates\ca\ca.crt ES_CA = r"""-----BEGIN CERTIFICATE----- MIIDSzCCAjOgAwIBAgIUbJ+EDMygpWJZqmLmCuL+5+y6FikwDQYJKoZIhvcNAQEL BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l cmF0ZWQgQ0EwIBcNMjAwNTIyMDI1NTI2WhgPMjIxNzA3MDkwMjU1MjZaMDQxMjAw BgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2VuZXJhdGVkIENB MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqtwhFfElRxjkmUXk729f Ge842YABLSNskuPwB3oBwYt2WYG5r6OTUP2yMel4hFToYm48xfT0tQy/NkeMgi4x EMHJbS7VHO/rCVMU2YW71cDPvT+XBgkudgmLD+PE91SB1cPQcJnX8AWSO3WkezrZ cQACIuPyfEHsafVyAlNdyAfYpd+Qdr6CpMiE9pAnh9x0B7wqSAYuZpeaFyb0avgx vsXUZYKzlIKql1k7m3owXGFZ1FluWzvpqRhBzsH5q8sHbSpr2LcnlTzR7lamZfPE kp1xpsW++49NsRuT8S04GfPXnuKUto2JE+0Zf98YjQ5Elrmgq/Wa0VuUwQw9jOpy UwIDAQABo1MwUTAdBgNVHQ4EFgQUmDMEiCUrO+BSqdfbpdhTotlCBSowHwYDVR0j BBgwFoAUmDMEiCUrO+BSqdfbpdhTotlCBSowDwYDVR0TAQH/BAUwAwEB/zANBgkq hkiG9w0BAQsFAAOCAQEAZ2W9EasnjyNMRrZap8wSsPOEljs+RkWHpWPWVUP5WTN2 x/tcMPrECTTYteyXsvlbx8pwl+Op2s0RTcqgG9MtxcEPk80Qo665TU3pc4AZPcux lfEaqM3Jq6DbviJTn3iCi811V2V/bj8UK7mchquW3qeGJcUnQrioqaqAJptHGzg9 uKnIe4FeQcte8RUz4pUNcc0q/dHXMZir/nCdWsaWMcywMGHtetUUtMJ/wigL5lig hlVxgh2REDxFD6h9Mbh8L8R/i/Zb2io3kl4jXeTlZeqHuBlNH1dRrPwgG7DMDqBU OitsVO3ZN3I7YfWpgxDjQeOl1XlDjRICZlSqKT0Oow== -----END CERTIFICATE-----""" context = create_default_context(cadata=ES_CA) context.check_hostname = False context.verify_mode = ssl.CERT_NONE context.check_hostname = False disable_warnings() ES_PALARM_DICT = { "hosts": ES_HOST_LIST, "sniff_on_start": False, # 连接前测试 "sniff_timeout": 60, # 设置超时时间 "timeout": 300, "http_auth": (ES_UESR, ES_PWD), # "ssl_context": context, "port": ES_PORT, # "scheme": ES_PROTOCAL, # "use_ssl": True, # "verify_certs": False, # "ca_certs": False, }
es_handle = create_connection(**ES_PALARM_DICT)
1.2 基本使用
增
test_1000_index = { "settings": { "number_of_shards": 3, "number_of_replicas": 2, "analysis": { "analyzer": { "caseSensitive": { "filter": "lowercase", "type": "custom", "tokenizer": "standard", } } }, }, "mappings": { "dynamic": False, "properties": { "name": {"type": "keyword"}, # ics_id "age": {"type": "integer"}, # url "city": { "type": "text", "analyzer": "caseSensitive", "search_analyzer": "caseSensitive", }, # 详情 "update_time": {"type": "date", "ignore_malformed": True}, # 更新时间 "create_time": {"type": "date", "ignore_malformed": True}, # 创建时间 }, }, } from datetime import datetime es_handle.indices.create(index = "test_1000", body = test_1000_index) es_handle.index( index="test_1000", doc_type="_doc", body={ 'name': "maomao", 'age': 22, 'city': "chengdu", "update_time": datetime.now(), # 最新一个成员检测时间 "create_time": datetime.now(), # 最新一次更新组织时间 }, )
改
这个id是自动生成的,可以根据需求,在创建数据的时候,指定ID。
# 根据ID更新 doc_body = { 'script': "ctx._source.remove('age')" } # 增加字段 doc_body = { 'script': "ctx._source.address = '合肥'" } # 修改部分字段 doc_body = { "doc": {"last_name": "xiao"} }
es_handle.update( index = "test_1000", # doc_type = "_doc", id = "_XLrgHgB19QJoOr4yDuA", body = {"doc": {"age": 18}}, )
查
## 通过scan批量获取数据 query = Search(using=es_handle, index='ys-log') protocol = set() for item in helpers.scan( es_handle, query=query, scroll='10m', raise_on_error=False, preserve_order=True, size=10000, index="ys-log", ): protocol.add(item["_source"]["protocol"]) print(protocol) query = Search(using=es_handle, index="test_1000") result = query.execute() data = [i.to_dict() for i in result] print(data)
# 通过aggs聚合数据
query = Search(using=es_handle, index='ys-log')
query = query.filter("range",
time={
"gte": "2020-05-01",
"lte": "2020-05-30",
},)
query.aggs.bucket(
"protocol",
"terms",
collect_mode="breadth_first",
field="protocol",
size=10,
)
query.aggs.metric(
"protocol_metric",
"cardinality",
field="protocol"
)
query = query.execute()
#
#
query_result1 = [
{"name": i['key'], "value": i['doc_count']}
for i in query["aggregations"]["protocol"]['buckets']
]
print(query_result1)
query_result2 = query["aggregations"]["protocol_metric"]["value"]
print(query_result2)
删除
# 根据ID删除 es.delete(index='megacorp', id='3oXEzm4BAZBCZGyZ2R40') {'_index': 'megacorp', '_type': '_doc', '_id': '3oXEzm4BAZBCZGyZ2R40', '_version': 2, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 2} # delete_by_query:删除满足条件的所有数据,查询条件必须符合DLS格式 query = { "query": { "match": { "first_name": "xiao" } } } result = es.delete_by_query(index="megacorp", body=query)