python操作elasticsearch7


一. 简介

  通过elasticsearch-dsl模块来操作es。

  版本:通过pip安装即可

elasticsearch==7.9.1
elasticsearch-dsl==7.3.0

 

1.1 建立es连接

es的配置文件,可以根据需求,进行ssl安全连接,这里没有启用。

WARN_LOG_INDEX = "ys-log"
WARN_EVENT_INDEX = "ys-event"
ATTACKER_INDEX = "ys-attacker"
ORG_INDEX = "ys-org"
VUL_NAME = "vulnerabilitys"
PATCH_NAME = "patchs"
EXPLOIT_NAME = "exploit"
ICS_NAME = "ics"
VUL_DRAW_NAME = "vul_draw_name"

# es 集群设置
ES_HOST_LIST = ["192.168.99.215"]
ES_PROTOCAL = "https"
ES_PORT = 9200
# 查看es-docker-compose\certificates\.env
ES_UESR = "elastic"
ES_PWD = "d@#%1saE2sadc"

# # 复制  es-docker-compose\certificates\ca\ca.crt
ES_CA = r"""-----BEGIN CERTIFICATE-----
MIIDSzCCAjOgAwIBAgIUbJ+EDMygpWJZqmLmCuL+5+y6FikwDQYJKoZIhvcNAQEL
BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l
cmF0ZWQgQ0EwIBcNMjAwNTIyMDI1NTI2WhgPMjIxNzA3MDkwMjU1MjZaMDQxMjAw
BgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2VuZXJhdGVkIENB
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqtwhFfElRxjkmUXk729f
Ge842YABLSNskuPwB3oBwYt2WYG5r6OTUP2yMel4hFToYm48xfT0tQy/NkeMgi4x
EMHJbS7VHO/rCVMU2YW71cDPvT+XBgkudgmLD+PE91SB1cPQcJnX8AWSO3WkezrZ
cQACIuPyfEHsafVyAlNdyAfYpd+Qdr6CpMiE9pAnh9x0B7wqSAYuZpeaFyb0avgx
vsXUZYKzlIKql1k7m3owXGFZ1FluWzvpqRhBzsH5q8sHbSpr2LcnlTzR7lamZfPE
kp1xpsW++49NsRuT8S04GfPXnuKUto2JE+0Zf98YjQ5Elrmgq/Wa0VuUwQw9jOpy
UwIDAQABo1MwUTAdBgNVHQ4EFgQUmDMEiCUrO+BSqdfbpdhTotlCBSowHwYDVR0j
BBgwFoAUmDMEiCUrO+BSqdfbpdhTotlCBSowDwYDVR0TAQH/BAUwAwEB/zANBgkq
hkiG9w0BAQsFAAOCAQEAZ2W9EasnjyNMRrZap8wSsPOEljs+RkWHpWPWVUP5WTN2
x/tcMPrECTTYteyXsvlbx8pwl+Op2s0RTcqgG9MtxcEPk80Qo665TU3pc4AZPcux
lfEaqM3Jq6DbviJTn3iCi811V2V/bj8UK7mchquW3qeGJcUnQrioqaqAJptHGzg9
uKnIe4FeQcte8RUz4pUNcc0q/dHXMZir/nCdWsaWMcywMGHtetUUtMJ/wigL5lig
hlVxgh2REDxFD6h9Mbh8L8R/i/Zb2io3kl4jXeTlZeqHuBlNH1dRrPwgG7DMDqBU
OitsVO3ZN3I7YfWpgxDjQeOl1XlDjRICZlSqKT0Oow==
-----END CERTIFICATE-----"""
context = create_default_context(cadata=ES_CA)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False

disable_warnings()
ES_PALARM_DICT = {
    "hosts": ES_HOST_LIST,
    "sniff_on_start": False,  # 连接前测试
    "sniff_timeout": 60,  # 设置超时时间
    "timeout": 300,
    "http_auth": (ES_UESR, ES_PWD),
    # "ssl_context": context,
    "port": ES_PORT,
    # "scheme": ES_PROTOCAL,
    # "use_ssl": True,
    # "verify_certs": False,
    # "ca_certs": False,
}

es_handle = create_connection(**ES_PALARM_DICT)

1.2 基本使用

test_1000_index = {
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 2,
        "analysis": {
            "analyzer": {
                "caseSensitive": {
                    "filter": "lowercase",
                    "type": "custom",
                    "tokenizer": "standard",
                }
            }
        },
    },
    "mappings": {
        "dynamic": False,
        "properties": {
            "name": {"type": "keyword"},  # ics_id
            "age": {"type": "integer"},  # url
            "city": {
                "type": "text",
                "analyzer": "caseSensitive",
                "search_analyzer": "caseSensitive",
            },  # 详情
            "update_time": {"type": "date", "ignore_malformed": True},  # 更新时间
            "create_time": {"type": "date", "ignore_malformed": True},  # 创建时间
        },
    },
}

from datetime import datetime

es_handle.indices.create(index = "test_1000", body = test_1000_index)

es_handle.index(
            index="test_1000",
            doc_type="_doc",
            body={
                'name': "maomao",
                'age': 22,
                'city': "chengdu",
                "update_time": datetime.now(),  # 最新一个成员检测时间
                "create_time": datetime.now(),  # 最新一次更新组织时间
            },
        )

这个id是自动生成的,可以根据需求,在创建数据的时候,指定ID。

 
 
# 根据ID更新 doc_body = { 'script': "ctx._source.remove('age')" } # 增加字段 doc_body = { 'script': "ctx._source.address = '合肥'" } # 修改部分字段 doc_body = { "doc": {"last_name": "xiao"} }


es_handle.update( index
= "test_1000", # doc_type = "_doc", id = "_XLrgHgB19QJoOr4yDuA", body = {"doc": {"age": 18}}, )

 

## 通过scan批量获取数据
query = Search(using=es_handle, index='ys-log')
protocol = set()

for item in helpers.scan(
            es_handle,
            query=query,
            scroll='10m',
            raise_on_error=False,
            preserve_order=True,
            size=10000,
            index="ys-log",
        ):
    protocol.add(item["_source"]["protocol"])
print(protocol)

query = Search(using=es_handle, index="test_1000")
result = query.execute()
data = [i.to_dict() for i in result]
print(data)

# 通过aggs聚合数据
query = Search(using=es_handle, index='ys-log')
query = query.filter("range",
time={
"gte": "2020-05-01",
"lte": "2020-05-30",
},)

query.aggs.bucket(
"protocol",
"terms",
collect_mode="breadth_first",
field="protocol",
size=10,
)
query.aggs.metric(
"protocol_metric",
"cardinality",
field="protocol"
)



query = query.execute()
#
#
query_result1 = [
{"name": i['key'], "value": i['doc_count']}
for i in query["aggregations"]["protocol"]['buckets']
]
print(query_result1)
query_result2 = query["aggregations"]["protocol_metric"]["value"]

print(query_result2)

删除

# 根据ID删除
es.delete(index='megacorp', id='3oXEzm4BAZBCZGyZ2R40')
{'_index': 'megacorp',
 '_type': '_doc',
 '_id': '3oXEzm4BAZBCZGyZ2R40',
 '_version': 2,
 'result': 'deleted',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 3,
 '_primary_term': 2}
# delete_by_query:删除满足条件的所有数据,查询条件必须符合DLS格式
query = {
    "query": {
        "match": {
            "first_name": "xiao"
        }
    }
}
result = es.delete_by_query(index="megacorp", body=query)

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM