elasticsearch簡介和elasticsearch_dsl


elasticsearch

es是基於lucene分片(shard)存儲的近實時的分布式搜索引擎

名詞解釋:
Lucene:使用java語言編寫的存儲與查詢框架,通過組織文檔與文本關系信息進行倒排索引,內部形成多個segment段進行存儲,是es的核心組件,但不具備分布式能力。

segment:Lucene內部最小的存儲單元,也是es的最小存儲單元,多個小segment可合為一個較大的segment,並但不能拆分。

shard:es為解決海量數據的處理能力,在Lucene之上設計了分片的概念,每個分片存儲部分數據,分片可以設置多個副本,通過內部routing算法將數據路由到各個分片上,以支持分布式存儲與查詢。

近實時:嚴格講es並不是索引即可見的數據庫,首先數據會被寫入主分片所在機器的內存中,再觸發flush操作,形成一個新的segment數據段,只有flush到磁盤的數據才會被異步拉取到其它副本節點,如果本次搜索命中副本節點且數據沒有同步的話,那么是不會被檢索到的;es默認flush間隔是1s,也可通過修改refresh_interval參數來調整間隔(為提升性能和體驗,一版設置30s-60s)。

分布式:es天生支持分布式,配置與使用上與單機版基本沒什么區別,可快速擴張至上千台集群規模、支持PB級數據檢索;通過內部路由算法將數據儲存到不同節點的分片上;當用戶發起一次查詢時,首先會在各個分片上完成提前批處理(這個會在之后章節詳細講解),處理后的數據匯總到請求節點再做一次全局處理后返回。

當然,也有人將es定義為開箱即用的NoSql文檔數據庫,這么說也沒錯,es借助其平滑擴展的能力實現了nosql數據庫對海量數據增刪改查的能力,目前市面上基於文檔存儲的nosql數據庫有MongoDB、couchbase、es、orientdb等,幾種數據庫都有其各自的使用場景,其中es與MongoDB備受國內開發人員青睞,社區活躍度極高,尤其es

ElasticSearch與數據庫的對應關系(7.0之前)

ES RDBS
index database
type table
filed column

通過Python庫elasticsearch_dsl處理ElasticSearch

 

二、Mapping&Setting

眾所周知完成一個索引庫的創建需要配置mapping與setting兩部分;

mapping:

常用數據類型:text、keyword、number、array、range、boolean、date、geo_point、ip、nested、object

 text:默認會進行分詞,支持模糊查詢(5.x之后版本string類型已廢棄,請大家使用text)。
 keyword:不進行分詞;keyword類型默認開啟doc_values來加速聚合排序操作,占用了大量磁盤io 如非必須可以禁用doc_values。
 number:如果只有過濾場景 用不到range查詢的話,使用keyword性能更佳,另外數字類型的doc_values比字符串更容易壓縮。
 array:es不需要顯示定義數組類型,只需要在插入數據時用'[]'表示即可,'[]'中的元素類型需保持一致。
 range:對數據的范圍進行索引;目前支持 number range、date range 、ip range。
 boolean: 只接受true、false 也可以是字符串類型的“true”、“false”
 date:支持毫秒、根據指定的format解析對應的日期格式,內部以long類型存儲。
 geo_point:存儲經緯度數據對。
 ip:將ip數據存儲在這種數據類型中,方便后期對ip字段的模糊與范圍查詢。
 nested:嵌套類型,一種特殊的object類型,存儲object數組,可檢索內部子項。
 object:嵌套類型,不支持數組。

es7.0新增數據類型:alias、date_nanos、features、vector

alias:並不實際存在,而是對已有字段的一種別名映射,搜索該字段與搜索實際字段返回的內容沒有本質的區別。
date_nanos:另一種時間類型,可精確到納秒,用法類似date。
features:用來存儲特征向量,數據不能為0和負數,查詢時只能使用rank_feature query,該字段主要為支持后續機器學習相關功能做准備。
vector:存儲特征數組,支持稀疏與稠密向量存儲,該字段主要為支持后續機器學習相關功能做准備。

doc_values:列式存儲,為支持快速聚合與排序場景而設計,不在該類場景的可禁用

 "user_id": { 
          "type": "keyword",
          "doc_values": false
        } 

index:控制字段索引方式

    analyzed:先分詞再索引
    not_analyzed:不分詞直接索引
    no:不被索引

ignore_malformed:是否忽略臟數據

ignore_malformed設置為true,如果遇到數據格式或類型錯誤數據將被忽略,其它字段會正常插入
如果設置為false,一旦數據不符合要求整個文檔將被拒絕。

_source:不需要回顯數據內容的可選擇禁用該字段

  "_source": {
        "enabled": false
   } 
需要注意的是禁用該項后將不能支持update和索引庫的reindex操作,需謹慎。

includes&excludes:_source字段黑白名單控制,可控制哪些字段在查詢結果的source中出現

    "_source": {
        "includes": [
          "*.count",
          "meta.*"
        ],
        "excludes": [
          "meta.description",
          "meta.attributes.*"
        ]
      }

dynamic:動態mapping,禁用后將不會自動創建field,但數據仍可以正常插入

"dynamic":"false"

_all:es6.x默認已禁用全文索引,es7.0徹底移除該配置

"_all": {
       "enabled": false
     }

norms:控制該字段是否參與相關度排名計算,如果該字段只做過濾用可禁用該項以提升搜索性能

"categorys": {
      "type": "text",
      "norms": false
    }

index_options:細粒度控制倒排索引方式

docs:只索引文檔id
freqs:文檔id和詞頻,詞頻可用於評分
positions:增加位置信息,位置可用於模糊和短語查詢
offsets:增加偏移量,高亮時會用到偏移量信息

setting:

    "index.max_result_window":20000   #控制查詢返回的最大結果數
    "index.merge.scheduler.max_thread_count": 2   #segment段合並時可使用的最大線程數,為避免過度的io操作,該值一般不超過2
    "index.routing.allocation.total_shards_per_node":10    #分配到單個節點的最大分片數
    "index.refresh_interval":"-1"    #index刷新頻率,頻繁刷新會降低性能,一般設置為30s-60s;-1表示禁用刷新
    "index.translog.durability":"async"    #translog刷新方式,如果對數據安全性要求不算太高,可設置為async以提升性能
    "index.translog.flush_threshold_size":"1024mb"    #translog刷新字節條件,超過1g才會刷新
    "index.translog.sync_interval":"120s"    #translog刷新時間條件,超過120s才會刷新
    "index.unassigned.node_left.delayed_timeout":"1d"    #當有節點宕機后索引庫多久觸發副本balance操作
    "index.search.slowlog.threshold.query.info":"1s"    #超過1s的查詢會被記錄到慢查詢日志里
    "index.store.type":"niofs"    #網絡通信協議
    "index.number_of_replicas":0    #index分片的副本數
    "index.number_of_shards":8    #index分片數,需要注意的是es7.0默認索引分片數調整為1了
    "index.codec":"best_compression"    #index數據的壓縮方式,best_compression壓縮可節省4-8倍的存儲空間
下面列舉一下elasticsearch的可配置項:
        1. 集群名稱,默認為elasticsearch:
cluster.name: elasticsearch
        2. 節點名稱,es啟動時會自動創建節點名稱,但你也可進行配置:
node.name: "Franz Kafka"
        3. 是否作為主節點,每個節點都可以被配置成為主節點,默認值為true:
node.master: true
        4. 是否存儲數據,即存儲索引片段,默認值為true:
node.data: true
        master和data同時配置會產生一些奇異的效果:
        1) 當master為false,而data為true時,會對該節點產生嚴重負荷;
        2) 當master為true,而data為false時,該節點作為一個協調者;
        3) 當master為false,data也為false時,該節點就變成了一個負載均衡器。
        你可以通過連接http://localhost:9200/_cluster/health或者http://localhost:9200/_cluster/nodes,或者使用插件http://github.com/lukas-vlcek/bigdesk或http://mobz.github.com/elasticsearch-head來查看集群狀態。
        5. 每個節點都可以定義一些與之關聯的通用屬性,用於后期集群進行碎片分配時的過濾:
node.rack: rack314
        6. 默認情況下,多個節點可以在同一個安裝路徑啟動,如果你想讓你的es只啟動一個節點,可以進行如下設置:
node.max_local_storage_nodes: 1
        7. 設置一個索引的碎片數量,默認值為5:
index.number_of_shards: 5
        8. 設置一個索引可被復制的數量,默認值為1:
index.number_of_replicas: 1
        當你想要禁用公布式時,你可以進行如下設置:
index.number_of_shards: 1
index.number_of_replicas: 0
        這兩個屬性的設置直接影響集群中索引和搜索操作的執行。假設你有足夠的機器來持有碎片和復制品,那么可以按如下規則設置這兩個值:
        1) 擁有更多的碎片可以提升索引執行能力,並允許通過機器分發一個大型的索引;
        2) 擁有更多的復制器能夠提升搜索執行能力以及集群能力。
        對於一個索引來說,number_of_shards只能設置一次,而number_of_replicas可以使用索引更新設置API在任何時候被增加或者減少。
        ElasticSearch關注加載均衡、遷移、從節點聚集結果等等。可以嘗試多種設計來完成這些功能。
        可以連接http://localhost:9200/A/_status來檢測索引的狀態。
        9. 配置文件所在的位置,即elasticsearch.yml和logging.yml所在的位置:
path.conf: /path/to/conf
        10. 分配給當前節點的索引數據所在的位置:
path.data: /path/to/data
        可以可選擇的包含一個以上的位置,使得數據在文件級別跨越位置,這樣在創建時就有更多的自由路徑,如:
path.data: /path/to/data1,/path/to/data2
        11. 臨時文件位置:
path.work: /path/to/work
        12. 日志文件所在位置:
path.logs: /path/to/logs
        13. 插件安裝位置:
path.plugins: /path/to/plugins
        14. 插件托管位置,若列表中的某一個插件未安裝,則節點無法啟動:
plugin.mandatory: mapper-attachments,lang-groovy
        15. JVM開始交換時,ElasticSearch表現並不好:你需要保障JVM不進行交換,可以將bootstrap.mlockall設置為true禁止交換:
bootstrap.mlockall: true
        請確保ES_MIN_MEM和ES_MAX_MEM的值是一樣的,並且能夠為ElasticSearch分配足夠的內在,並為系統操作保留足夠的內存。
        16. 默認情況下,ElasticSearch使用0.0.0.0地址,並為http傳輸開啟9200-9300端口,為節點到節點的通信開啟9300-9400端口,也可以自行設置IP地址:
network.bind_host: 192.168.0.1
        17. publish_host設置其他節點連接此節點的地址,如果不設置的話,則自動獲取,publish_host的地址必須為真實地址:
network.publish_host: 192.168.0.1
        18. bind_host和publish_host可以一起設置:
network.host: 192.168.0.1
        19. 可以定制該節點與其他節點交互的端口:
transport.tcp.port: 9300
        20. 節點間交互時,可以設置是否壓縮,轉為為不壓縮:
transport.tcp.compress: true
        21. 可以為Http傳輸監聽定制端口:
http.port: 9200
        22. 設置內容的最大長度:
http.max_content_length: 100mb
        23. 禁止HTTP
http.enabled: false
        24. 網關允許在所有集群重啟后持有集群狀態,集群狀態的變更都會被保存下來,當第一次啟用集群時,可以從網關中讀取到狀態,默認網關類型(也是推薦的)是local:
gateway.type: local
        25. 允許在N個節點啟動后恢復過程:
gateway.recover_after_nodes: 1
        26. 設置初始化恢復過程的超時時間:
gateway.recover_after_time: 5m
        27. 設置該集群中可存在的節點上限:
gateway.expected_nodes: 2
        28. 設置一個節點的並發數量,有兩種情況,一種是在初始復蘇過程中:
cluster.routing.allocation.node_initial_primaries_recoveries: 4
        另一種是在添加、刪除節點及調整時:
cluster.routing.allocation.node_concurrent_recoveries: 2
        29. 設置復蘇時的吞吐量,默認情況下是無限的:
indices.recovery.max_size_per_sec: 0
        30. 設置從對等節點恢復片段時打開的流的數量上限:
indices.recovery.concurrent_streams: 5
        31. 設置一個集群中主節點的數量,當多於三個節點時,該值可在2-4之間:
discovery.zen.minimum_master_nodes: 1
        32. 設置ping其他節點時的超時時間,網絡比較慢時可將該值設大:
discovery.zen.ping.timeout: 3s
http://elasticsearch.org/guide/reference/modules/discovery/zen.html上有更多關於discovery的設置。
        33. 禁止當前節點發現多個集群節點,默認值為true:
discovery.zen.ping.multicast.enabled: false
        34. 設置新節點被啟動時能夠發現的主節點列表:
discovery.zen.ping.unicast.hosts: ["host1", "host2:port", "host3[portX-portY]"]

 

//--------------------------------------------------------------------------------------------

 

gateway類型,表示持久化數據存放位置,默認local,推薦的方式,此外還有NFS、HDFS、S3
gateway.type : local
#集群名稱,區分集群的唯一名稱
cluster.name : 'TEST'
 
#索引文件存放目錄
#path.data : '/var/elasticsearch/data'
#日志文件存放目錄
#path.logs : '/var/elasticsearch/logs'
 
#網絡配置
#network.tcp.keep_alive : true
#network.tcp.send_buffer_size : 8192
#network.tcp.receive_buffer_size : 8192
#gateway.recover_after_nodes : 1
#gateway.recover_after_time : 10s
#gateway.expected_nodes : 2
 
#自動發現相關配置
#discovery.zen.fd.connect_on_network_disconnect : true
#discovery.zen.initial_ping_timeout : 10s
#discovery.zen.fd.ping_interval : 2s
#discovery.zen.fd.ping_retries  : 10
 
#索引snapshot時間只對當gateway設置為NFS時有效
#index.gateway.snapshot_interval : 1s
#刷新時間間隔
#index.engine.robin.refresh_interval : -1
 
#默認索引碎片數
index.number_of_shards : 3
#默認索引副本數
index.number_of_replicas : 1
 
#默認索引合並因子
#index.merge.policy.merge_factor : 100
#index.merge.policy.min_merge_docs : 1000
#index.merge.policy.use_compound_file : true
#indices.memory.index_buffer_size : 5%
 
#Gateway相關配置
# Gateway Settings
#gateway:
#  recover_after_nodes: 1
#  recover_after_time: 5m
#  expected_nodes: 2
#提示:當集群期望節點達不到的時候,集群就會處於block,無法正常索引和查詢,說明集群中某個節點未能正常啟動,這正是我們期望的效果,block住,避免照成數據的不一致
 
#強制所有內存鎖定,不要沒事搞個swap什么的來影響性能
# Force all memory to be locked, forcing JVM to never swap
#  (make sure to set MIN and MAX mem to the same value)
#bootstrap:
#  mlockall: true
 
 
#當禁用multcast廣播的時候,可以手動設置集群的節點ip
# Unicast Discovery (disable multicast)
#discovery:
#  zen:
#    multicast.enabled: false
#    unicast.hosts: ["host1", "host2"]
--------------------------------------------------------------------------------
默認配置為:節點每隔1s同master發送1次心跳,超時時間為30s,測試次數為3次,超過3次,則認為該節點同master已經脫離了。以上為elasticsearch的默認配置。在實際生產環境中,每隔1s,太頻繁了,會產生太多網絡流量。我們可以在elasticsearch.yml如下修改。 

discovery.zen.fd.ping_timeout: 120s  
discovery.zen.fd.ping_retries: 6  
discovery.zen.fd.ping_interval: 30s  
擴展配置

三、elasticsearch7.0有哪些重大改進

1、徹底廢棄多type支持,包括api層面,之前版本可在一個索引庫下創建多個type。

2、徹底廢棄_all字段支持,為提升性能默認不再支持全文檢索,即7.0之后版本進行該項配置會報錯。

3、新增應用程序主動監測功能,搭配對應的kibana版本,用戶可監測應用服務的健康狀態,並在出現問題后及時發出通知。

4、取消query結果中hits count的支持(聚合查詢除外),使得查詢性能大幅提升(3x-7x faster)。這意味着,每次查詢后將不能得到精確的結果集數量。

5、新增intervals query ,用戶可設置多字符串在文檔中出現的先后順序進行檢索。

6、新增script_core ,通過此操作用戶可以精確控制返回結果的score分值。

7、優化集群協調子系統,縮減配置項提升穩定性。

8、新增 alias、date_nanos、features、vector等數據類型。

9、7.0自帶java環境,所以我們在安裝es時不再需要單獨下載和配置java_home。

10、7.0將不會再有OOM的情況,JVM引入了新的circuit breaker(熔斷)機制,當查詢或聚合的數據量超出單機處理的最大內存限制時會被截斷,並拋出異常(有點類似clickhouse)。

11、豐富多彩的kibana功能。

四、python api elasticsearch_dsl的使用

import datetime
from elasticsearch_dsl import Document, Date, Nested, InnerDoc, Keyword, Integer, Long
from elasticsearch_dsl import Search
from elasticsearch import Elasticsearch
from elasticsearch_dsl.connections import create_connection

class AppInfo(InnerDoc):
id = Keyword()
type = Keyword()
rank = Keyword()


class TemplateModel(Document):
keyword_id = Integer()
country_id = Integer()
hint = Integer()
keyword = Keyword()
search_count = Integer()
appstoreList = Nested(AppInfo)


class KeywordSearch(TemplateModel):
class Index:
# name = "index_name" # pass
settings = {
'number_of_shards': 5, # 分片
'number_of_replicas': 1, # 副本備份
'max_result_window': 20000, # 默認查詢數量
'refresh_interval': "30s",
# "translog": {"sync_interval": "15s", "durability": "async"}
}


class KeywordToApp(Document):
keyword_id = Keyword()
country_id = Keyword()
id = Keyword()
keyword = Keyword()
hint = Integer()
search_count = Integer()
app_type = Integer()
rank = Integer()
de = Integer()

class Index:
# name = "index_name" # pass
settings = {
'number_of_shards': 5, # 分片
'number_of_replicas': 1, # 副本備份
'max_result_window': 20000, # 默認查詢數量
'refresh_interval': "30s",
# "translog": {"sync_interval": "15s", "durability": "async"}
}

添加連接

client = Elasticsearch('127.0.0.1:9190')  # 連接
print(client) # <Elasticsearch([{'host': '127.0.0.1', 'port': 9190}])>

client = create_connection(alias="alias_test", hosts=["127.0.0.1:9190"]) # 使用別名
print(KeywordSearch._get_connection(using="alias_test", )) # <Elasticsearch([{'host': '127.0.0.1', 'port': 9190}])>
# elasticsearch_dsl是高級模塊 elasticsearch是低級模塊
# elasticsearch_dsl 基於 elasticsearch


client = Elasticsearch('127.0.0.1', http_auth=('root', 'password'), timeout=3600)  # 使用密碼認證

查看index詳情

    indexs = client.indices.get('*')  # 獲取所有的index詳情
    print(indexs)
    print(indexs.keys())  # 所有index的名字
    index = list(indexs.keys())[0]
    print(index)  # 獲取第一個index的名字

創建和刪除index

    KeywordSearch._index._name = f'1_test'  # 賦值model中的index的名字
    print(KeywordSearch._index._name)
    
    # 創建空index 方法1 mapping根據定義filed去生成
    # print(KeywordSearch.init(using="alias_test", ))  # ,使用model遷移創建index以及定義的mapping
    
    # 創建空index  方法2  無mapping
    # KeywordSearch._get_connection(using="alias_test", ).index(index=f'{i}_test')
    
    # 刪除index
    # print(KeywordSearch._get_connection(using="alias_test", ).indices.delete(index= f'1_test')) # 刪除index

寫入更新數據

更新保存數據1
KeywordSearch(_id=1,
keyword_id=1,
country_id=1,
appstoreList=[{"id": 1, "rank": 1}],
create_time=datetime.datetime.now().__format__("%Y-%m-%d %H:%M:%S")
).save(using="alias_test")

# 更新保存數據2
KeywordSearch._get_connection(using="alias_test", ).update(
index=f'1_test',
id=1,
body={"doc": {"keyword_id": 1,
"country_id": 1,
"appstoreList": [{"id": 2, "rank": 2}],
"create_time": datetime.datetime.now().__format__("%Y-%m-%d %H:%M:%S")},
"doc_as_upsert": True # 不存在便插入
}) # 數據更新 "_version": 2

查詢數據

class ElasticSearchUtil:

@classmethod
def InsertDocument(cls, using, index, body, id=None):
'''
插入一條數據body到指定的index,可指定Id,若不指定,會自動生成
'''
return using.index(index=index, body=body, id=id)

@classmethod
def bulkUpdate(cls, using, index, body):
'''
批量插入更新指定index、id對應的數據
'''
action = [{"_op_type": "update",
"_index": index,
"_type": "_doc",
"_id": str(i.get("keyword_id")) + str(i.get("de")),
"doc": i,
"doc_as_upsert": True} for i in body]
return helpers.bulk(using, action, index=index)

@classmethod
def deleteDocByQuery(cls, using, index, query):
'''
刪除idnex下符合條件query的所有數據
:return:
'''
return using.delete_by_query(index=index, body=query, conflicts="proceed", request_timeout=100)

@classmethod
def deleteDocByDeCount(cls, using, index, keyword_id, de_count):
'''
刪除idnex下符合條件query的所有數據
:return:
'''
query = {
"query": {
"bool": {
"filter": [{
"term": {"keyword_id": keyword_id},
},
{"range": {
"de": {"gte": de_count}
}
}
]
}
}
}

return using.delete_by_query(index=index, body=query, conflicts="proceed", request_timeout=100)

@classmethod
def searchDoc(cls, using, index=None, query=None):
'''
查找index下所有符合條件的數據
'''
return using.search(index=index, body=query, request_timeout=300)

@classmethod
def getDocById(cls, using, index, id):
'''
獲取指定index、id對應的數據
'''
return using.get(index=index, id=id)

@classmethod
def updateDocById(cls, using, index, id, body=None):
'''
更新id所對應的數據
'''
return using.update(index=index, id=id, body=body)

@classmethod
def updateDocByQuery(cls, using, index, query):
'''
批量更新 符合該條件的批量更改hint字段
query:
'''
return using.update_by_query(index=index, body=query, request_timeout=60)

@classmethod
def insertBulk(cls, using, index, body=None):
'''
批量插入doc
'''
return using.bulk(index=index, body=body, request_timeout=60)
if __name__ == '__main__':
    # 批量刪除上一次的數據
    query = {'query': {'match': {'keyword_id': item.get("keyword_id")}}}
    ElasticSearchUtil.deleteDocByQuery(KeywordToApp._get_connection(using="search_1"),
                                       KeywordToApp._index._name,
                                       query=query)
    # 批量更新新的數據
    if key_app:
        ElasticSearchUtil.insertBulk(KeywordToApp._get_connection(using="search_2"),
                                     KeywordToApp._index._name,
                                     key_app
                                     )

五、遇到的問題

1.默認node最大分片數是1000

如果單個node想要創建更多的index和分片

{"Content-Type":"application/json"}
PUT
39.105.220.74:9190/_cluster/settings
{
  "transient": {
    "cluster": {
      "max_shards_per_node": 10000
    }
  }
}

 2.解決丟失分片。又yellow變gree

POST
http://180.76.153.235:9190/_cluster/reroute?retry_failed=true
{"Content-Type":"application/json"}

 

 

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM