elasticsearch
es是基於lucene分片(shard)存儲的近實時的分布式搜索引擎
名詞解釋:
Lucene:使用java語言編寫的存儲與查詢框架,通過組織文檔與文本關系信息進行倒排索引,內部形成多個segment段進行存儲,是es的核心組件,但不具備分布式能力。
segment:Lucene內部最小的存儲單元,也是es的最小存儲單元,多個小segment可合為一個較大的segment,並但不能拆分。
shard:es為解決海量數據的處理能力,在Lucene之上設計了分片的概念,每個分片存儲部分數據,分片可以設置多個副本,通過內部routing算法將數據路由到各個分片上,以支持分布式存儲與查詢。
近實時:嚴格講es並不是索引即可見的數據庫,首先數據會被寫入主分片所在機器的內存中,再觸發flush操作,形成一個新的segment數據段,只有flush到磁盤的數據才會被異步拉取到其它副本節點,如果本次搜索命中副本節點且數據沒有同步的話,那么是不會被檢索到的;es默認flush間隔是1s,也可通過修改refresh_interval參數來調整間隔(為提升性能和體驗,一版設置30s-60s)。
分布式:es天生支持分布式,配置與使用上與單機版基本沒什么區別,可快速擴張至上千台集群規模、支持PB級數據檢索;通過內部路由算法將數據儲存到不同節點的分片上;當用戶發起一次查詢時,首先會在各個分片上完成提前批處理(這個會在之后章節詳細講解),處理后的數據匯總到請求節點再做一次全局處理后返回。
當然,也有人將es定義為開箱即用的NoSql文檔數據庫,這么說也沒錯,es借助其平滑擴展的能力實現了nosql數據庫對海量數據增刪改查的能力,目前市面上基於文檔存儲的nosql數據庫有MongoDB、couchbase、es、orientdb等,幾種數據庫都有其各自的使用場景,其中es與MongoDB備受國內開發人員青睞,社區活躍度極高,尤其es
ElasticSearch與數據庫的對應關系(7.0之前)
ES | RDBS |
---|---|
index | database |
type | table |
filed | column |
通過Python庫elasticsearch_dsl處理ElasticSearch
二、Mapping&Setting
眾所周知完成一個索引庫的創建需要配置mapping與setting兩部分;
mapping:
常用數據類型:text、keyword、number、array、range、boolean、date、geo_point、ip、nested、object
text:默認會進行分詞,支持模糊查詢(5.x之后版本string類型已廢棄,請大家使用text)。 keyword:不進行分詞;keyword類型默認開啟doc_values來加速聚合排序操作,占用了大量磁盤io 如非必須可以禁用doc_values。 number:如果只有過濾場景 用不到range查詢的話,使用keyword性能更佳,另外數字類型的doc_values比字符串更容易壓縮。 array:es不需要顯示定義數組類型,只需要在插入數據時用'[]'表示即可,'[]'中的元素類型需保持一致。 range:對數據的范圍進行索引;目前支持 number range、date range 、ip range。 boolean: 只接受true、false 也可以是字符串類型的“true”、“false” date:支持毫秒、根據指定的format解析對應的日期格式,內部以long類型存儲。 geo_point:存儲經緯度數據對。 ip:將ip數據存儲在這種數據類型中,方便后期對ip字段的模糊與范圍查詢。 nested:嵌套類型,一種特殊的object類型,存儲object數組,可檢索內部子項。 object:嵌套類型,不支持數組。
es7.0新增數據類型:alias、date_nanos、features、vector
alias:並不實際存在,而是對已有字段的一種別名映射,搜索該字段與搜索實際字段返回的內容沒有本質的區別。
date_nanos:另一種時間類型,可精確到納秒,用法類似date。
features:用來存儲特征向量,數據不能為0和負數,查詢時只能使用rank_feature query,該字段主要為支持后續機器學習相關功能做准備。
vector:存儲特征數組,支持稀疏與稠密向量存儲,該字段主要為支持后續機器學習相關功能做准備。
doc_values:列式存儲,為支持快速聚合與排序場景而設計,不在該類場景的可禁用
"user_id": { "type": "keyword", "doc_values": false }
index:控制字段索引方式
analyzed:先分詞再索引
not_analyzed:不分詞直接索引
no:不被索引
ignore_malformed:是否忽略臟數據
ignore_malformed設置為true,如果遇到數據格式或類型錯誤數據將被忽略,其它字段會正常插入
如果設置為false,一旦數據不符合要求整個文檔將被拒絕。
_source:不需要回顯數據內容的可選擇禁用該字段
"_source": { "enabled": false }
需要注意的是禁用該項后將不能支持update和索引庫的reindex操作,需謹慎。
includes&excludes:_source字段黑白名單控制,可控制哪些字段在查詢結果的source中出現
"_source": { "includes": [ "*.count", "meta.*" ], "excludes": [ "meta.description", "meta.attributes.*" ] }
dynamic:動態mapping,禁用后將不會自動創建field,但數據仍可以正常插入
"dynamic":"false"
_all:es6.x默認已禁用全文索引,es7.0徹底移除該配置
"_all": { "enabled": false }
norms:控制該字段是否參與相關度排名計算,如果該字段只做過濾用可禁用該項以提升搜索性能
"categorys": { "type": "text", "norms": false }
index_options:細粒度控制倒排索引方式
docs:只索引文檔id
freqs:文檔id和詞頻,詞頻可用於評分
positions:增加位置信息,位置可用於模糊和短語查詢
offsets:增加偏移量,高亮時會用到偏移量信息
setting:
"index.max_result_window":20000 #控制查詢返回的最大結果數 "index.merge.scheduler.max_thread_count": 2 #segment段合並時可使用的最大線程數,為避免過度的io操作,該值一般不超過2 "index.routing.allocation.total_shards_per_node":10 #分配到單個節點的最大分片數 "index.refresh_interval":"-1" #index刷新頻率,頻繁刷新會降低性能,一般設置為30s-60s;-1表示禁用刷新 "index.translog.durability":"async" #translog刷新方式,如果對數據安全性要求不算太高,可設置為async以提升性能 "index.translog.flush_threshold_size":"1024mb" #translog刷新字節條件,超過1g才會刷新 "index.translog.sync_interval":"120s" #translog刷新時間條件,超過120s才會刷新 "index.unassigned.node_left.delayed_timeout":"1d" #當有節點宕機后索引庫多久觸發副本balance操作 "index.search.slowlog.threshold.query.info":"1s" #超過1s的查詢會被記錄到慢查詢日志里 "index.store.type":"niofs" #網絡通信協議 "index.number_of_replicas":0 #index分片的副本數 "index.number_of_shards":8 #index分片數,需要注意的是es7.0默認索引分片數調整為1了 "index.codec":"best_compression" #index數據的壓縮方式,best_compression壓縮可節省4-8倍的存儲空間

下面列舉一下elasticsearch的可配置項: 1. 集群名稱,默認為elasticsearch: cluster.name: elasticsearch 2. 節點名稱,es啟動時會自動創建節點名稱,但你也可進行配置: node.name: "Franz Kafka" 3. 是否作為主節點,每個節點都可以被配置成為主節點,默認值為true: node.master: true 4. 是否存儲數據,即存儲索引片段,默認值為true: node.data: true master和data同時配置會產生一些奇異的效果: 1) 當master為false,而data為true時,會對該節點產生嚴重負荷; 2) 當master為true,而data為false時,該節點作為一個協調者; 3) 當master為false,data也為false時,該節點就變成了一個負載均衡器。 你可以通過連接http://localhost:9200/_cluster/health或者http://localhost:9200/_cluster/nodes,或者使用插件http://github.com/lukas-vlcek/bigdesk或http://mobz.github.com/elasticsearch-head來查看集群狀態。 5. 每個節點都可以定義一些與之關聯的通用屬性,用於后期集群進行碎片分配時的過濾: node.rack: rack314 6. 默認情況下,多個節點可以在同一個安裝路徑啟動,如果你想讓你的es只啟動一個節點,可以進行如下設置: node.max_local_storage_nodes: 1 7. 設置一個索引的碎片數量,默認值為5: index.number_of_shards: 5 8. 設置一個索引可被復制的數量,默認值為1: index.number_of_replicas: 1 當你想要禁用公布式時,你可以進行如下設置: index.number_of_shards: 1 index.number_of_replicas: 0 這兩個屬性的設置直接影響集群中索引和搜索操作的執行。假設你有足夠的機器來持有碎片和復制品,那么可以按如下規則設置這兩個值: 1) 擁有更多的碎片可以提升索引執行能力,並允許通過機器分發一個大型的索引; 2) 擁有更多的復制器能夠提升搜索執行能力以及集群能力。 對於一個索引來說,number_of_shards只能設置一次,而number_of_replicas可以使用索引更新設置API在任何時候被增加或者減少。 ElasticSearch關注加載均衡、遷移、從節點聚集結果等等。可以嘗試多種設計來完成這些功能。 可以連接http://localhost:9200/A/_status來檢測索引的狀態。 9. 配置文件所在的位置,即elasticsearch.yml和logging.yml所在的位置: path.conf: /path/to/conf 10. 分配給當前節點的索引數據所在的位置: path.data: /path/to/data 可以可選擇的包含一個以上的位置,使得數據在文件級別跨越位置,這樣在創建時就有更多的自由路徑,如: path.data: /path/to/data1,/path/to/data2 11. 臨時文件位置: path.work: /path/to/work 12. 日志文件所在位置: path.logs: /path/to/logs 13. 插件安裝位置: path.plugins: /path/to/plugins 14. 插件托管位置,若列表中的某一個插件未安裝,則節點無法啟動: plugin.mandatory: mapper-attachments,lang-groovy 15. JVM開始交換時,ElasticSearch表現並不好:你需要保障JVM不進行交換,可以將bootstrap.mlockall設置為true禁止交換: bootstrap.mlockall: true 請確保ES_MIN_MEM和ES_MAX_MEM的值是一樣的,並且能夠為ElasticSearch分配足夠的內在,並為系統操作保留足夠的內存。 16. 默認情況下,ElasticSearch使用0.0.0.0地址,並為http傳輸開啟9200-9300端口,為節點到節點的通信開啟9300-9400端口,也可以自行設置IP地址: network.bind_host: 192.168.0.1 17. publish_host設置其他節點連接此節點的地址,如果不設置的話,則自動獲取,publish_host的地址必須為真實地址: network.publish_host: 192.168.0.1 18. bind_host和publish_host可以一起設置: network.host: 192.168.0.1 19. 可以定制該節點與其他節點交互的端口: transport.tcp.port: 9300 20. 節點間交互時,可以設置是否壓縮,轉為為不壓縮: transport.tcp.compress: true 21. 可以為Http傳輸監聽定制端口: http.port: 9200 22. 設置內容的最大長度: http.max_content_length: 100mb 23. 禁止HTTP http.enabled: false 24. 網關允許在所有集群重啟后持有集群狀態,集群狀態的變更都會被保存下來,當第一次啟用集群時,可以從網關中讀取到狀態,默認網關類型(也是推薦的)是local: gateway.type: local 25. 允許在N個節點啟動后恢復過程: gateway.recover_after_nodes: 1 26. 設置初始化恢復過程的超時時間: gateway.recover_after_time: 5m 27. 設置該集群中可存在的節點上限: gateway.expected_nodes: 2 28. 設置一個節點的並發數量,有兩種情況,一種是在初始復蘇過程中: cluster.routing.allocation.node_initial_primaries_recoveries: 4 另一種是在添加、刪除節點及調整時: cluster.routing.allocation.node_concurrent_recoveries: 2 29. 設置復蘇時的吞吐量,默認情況下是無限的: indices.recovery.max_size_per_sec: 0 30. 設置從對等節點恢復片段時打開的流的數量上限: indices.recovery.concurrent_streams: 5 31. 設置一個集群中主節點的數量,當多於三個節點時,該值可在2-4之間: discovery.zen.minimum_master_nodes: 1 32. 設置ping其他節點時的超時時間,網絡比較慢時可將該值設大: discovery.zen.ping.timeout: 3s http://elasticsearch.org/guide/reference/modules/discovery/zen.html上有更多關於discovery的設置。 33. 禁止當前節點發現多個集群節點,默認值為true: discovery.zen.ping.multicast.enabled: false 34. 設置新節點被啟動時能夠發現的主節點列表: discovery.zen.ping.unicast.hosts: ["host1", "host2:port", "host3[portX-portY]"] //-------------------------------------------------------------------------------------------- gateway類型,表示持久化數據存放位置,默認local,推薦的方式,此外還有NFS、HDFS、S3 gateway.type : local #集群名稱,區分集群的唯一名稱 cluster.name : 'TEST' #索引文件存放目錄 #path.data : '/var/elasticsearch/data' #日志文件存放目錄 #path.logs : '/var/elasticsearch/logs' #網絡配置 #network.tcp.keep_alive : true #network.tcp.send_buffer_size : 8192 #network.tcp.receive_buffer_size : 8192 #gateway.recover_after_nodes : 1 #gateway.recover_after_time : 10s #gateway.expected_nodes : 2 #自動發現相關配置 #discovery.zen.fd.connect_on_network_disconnect : true #discovery.zen.initial_ping_timeout : 10s #discovery.zen.fd.ping_interval : 2s #discovery.zen.fd.ping_retries : 10 #索引snapshot時間只對當gateway設置為NFS時有效 #index.gateway.snapshot_interval : 1s #刷新時間間隔 #index.engine.robin.refresh_interval : -1 #默認索引碎片數 index.number_of_shards : 3 #默認索引副本數 index.number_of_replicas : 1 #默認索引合並因子 #index.merge.policy.merge_factor : 100 #index.merge.policy.min_merge_docs : 1000 #index.merge.policy.use_compound_file : true #indices.memory.index_buffer_size : 5% #Gateway相關配置 # Gateway Settings #gateway: # recover_after_nodes: 1 # recover_after_time: 5m # expected_nodes: 2 #提示:當集群期望節點達不到的時候,集群就會處於block,無法正常索引和查詢,說明集群中某個節點未能正常啟動,這正是我們期望的效果,block住,避免照成數據的不一致 #強制所有內存鎖定,不要沒事搞個swap什么的來影響性能 # Force all memory to be locked, forcing JVM to never swap # (make sure to set MIN and MAX mem to the same value) #bootstrap: # mlockall: true #當禁用multcast廣播的時候,可以手動設置集群的節點ip # Unicast Discovery (disable multicast) #discovery: # zen: # multicast.enabled: false # unicast.hosts: ["host1", "host2"] -------------------------------------------------------------------------------- 默認配置為:節點每隔1s同master發送1次心跳,超時時間為30s,測試次數為3次,超過3次,則認為該節點同master已經脫離了。以上為elasticsearch的默認配置。在實際生產環境中,每隔1s,太頻繁了,會產生太多網絡流量。我們可以在elasticsearch.yml如下修改。 discovery.zen.fd.ping_timeout: 120s discovery.zen.fd.ping_retries: 6 discovery.zen.fd.ping_interval: 30s
三、elasticsearch7.0有哪些重大改進
1、徹底廢棄多type支持,包括api層面,之前版本可在一個索引庫下創建多個type。
2、徹底廢棄_all字段支持,為提升性能默認不再支持全文檢索,即7.0之后版本進行該項配置會報錯。
3、新增應用程序主動監測功能,搭配對應的kibana版本,用戶可監測應用服務的健康狀態,並在出現問題后及時發出通知。
4、取消query結果中hits count的支持(聚合查詢除外),使得查詢性能大幅提升(3x-7x faster)。這意味着,每次查詢后將不能得到精確的結果集數量。
5、新增intervals query ,用戶可設置多字符串在文檔中出現的先后順序進行檢索。
6、新增script_core ,通過此操作用戶可以精確控制返回結果的score分值。
7、優化集群協調子系統,縮減配置項提升穩定性。
8、新增 alias、date_nanos、features、vector等數據類型。
9、7.0自帶java環境,所以我們在安裝es時不再需要單獨下載和配置java_home。
10、7.0將不會再有OOM的情況,JVM引入了新的circuit breaker(熔斷)機制,當查詢或聚合的數據量超出單機處理的最大內存限制時會被截斷,並拋出異常(有點類似clickhouse)。
11、豐富多彩的kibana功能。
四、python api elasticsearch_dsl的使用
import datetime from elasticsearch_dsl import Document, Date, Nested, InnerDoc, Keyword, Integer, Long from elasticsearch_dsl import Search from elasticsearch import Elasticsearch from elasticsearch_dsl.connections import create_connection
class AppInfo(InnerDoc):
id = Keyword()
type = Keyword()
rank = Keyword()
class TemplateModel(Document):
keyword_id = Integer()
country_id = Integer()
hint = Integer()
keyword = Keyword()
search_count = Integer()
appstoreList = Nested(AppInfo)
class KeywordSearch(TemplateModel):
class Index:
# name = "index_name" # pass
settings = {
'number_of_shards': 5, # 分片
'number_of_replicas': 1, # 副本備份
'max_result_window': 20000, # 默認查詢數量
'refresh_interval': "30s",
# "translog": {"sync_interval": "15s", "durability": "async"}
}
class KeywordToApp(Document):
keyword_id = Keyword()
country_id = Keyword()
id = Keyword()
keyword = Keyword()
hint = Integer()
search_count = Integer()
app_type = Integer()
rank = Integer()
de = Integer()
class Index:
# name = "index_name" # pass
settings = {
'number_of_shards': 5, # 分片
'number_of_replicas': 1, # 副本備份
'max_result_window': 20000, # 默認查詢數量
'refresh_interval': "30s",
# "translog": {"sync_interval": "15s", "durability": "async"}
}
添加連接
client = Elasticsearch('127.0.0.1:9190') # 連接
print(client) # <Elasticsearch([{'host': '127.0.0.1', 'port': 9190}])>
client = create_connection(alias="alias_test", hosts=["127.0.0.1:9190"]) # 使用別名
print(KeywordSearch._get_connection(using="alias_test", )) # <Elasticsearch([{'host': '127.0.0.1', 'port': 9190}])>
# elasticsearch_dsl是高級模塊 elasticsearch是低級模塊
# elasticsearch_dsl 基於 elasticsearch
client = Elasticsearch('127.0.0.1', http_auth=('root', 'password'), timeout=3600) # 使用密碼認證
查看index詳情
indexs = client.indices.get('*') # 獲取所有的index詳情 print(indexs) print(indexs.keys()) # 所有index的名字 index = list(indexs.keys())[0] print(index) # 獲取第一個index的名字
創建和刪除index
KeywordSearch._index._name = f'1_test' # 賦值model中的index的名字 print(KeywordSearch._index._name) # 創建空index 方法1 mapping根據定義filed去生成 # print(KeywordSearch.init(using="alias_test", )) # ,使用model遷移創建index以及定義的mapping # 創建空index 方法2 無mapping # KeywordSearch._get_connection(using="alias_test", ).index(index=f'{i}_test') # 刪除index # print(KeywordSearch._get_connection(using="alias_test", ).indices.delete(index= f'1_test')) # 刪除index
寫入更新數據
更新保存數據1
KeywordSearch(_id=1,
keyword_id=1,
country_id=1,
appstoreList=[{"id": 1, "rank": 1}],
create_time=datetime.datetime.now().__format__("%Y-%m-%d %H:%M:%S")
).save(using="alias_test")
# 更新保存數據2
KeywordSearch._get_connection(using="alias_test", ).update(
index=f'1_test',
id=1,
body={"doc": {"keyword_id": 1,
"country_id": 1,
"appstoreList": [{"id": 2, "rank": 2}],
"create_time": datetime.datetime.now().__format__("%Y-%m-%d %H:%M:%S")},
"doc_as_upsert": True # 不存在便插入
}) # 數據更新 "_version": 2
查詢數據
class ElasticSearchUtil:
@classmethod
def InsertDocument(cls, using, index, body, id=None):
'''
插入一條數據body到指定的index,可指定Id,若不指定,會自動生成
'''
return using.index(index=index, body=body, id=id)
@classmethod
def bulkUpdate(cls, using, index, body):
'''
批量插入更新指定index、id對應的數據
'''
action = [{"_op_type": "update",
"_index": index,
"_type": "_doc",
"_id": str(i.get("keyword_id")) + str(i.get("de")),
"doc": i,
"doc_as_upsert": True} for i in body]
return helpers.bulk(using, action, index=index)
@classmethod
def deleteDocByQuery(cls, using, index, query):
'''
刪除idnex下符合條件query的所有數據
:return:
'''
return using.delete_by_query(index=index, body=query, conflicts="proceed", request_timeout=100)
@classmethod
def deleteDocByDeCount(cls, using, index, keyword_id, de_count):
'''
刪除idnex下符合條件query的所有數據
:return:
'''
query = {
"query": {
"bool": {
"filter": [{
"term": {"keyword_id": keyword_id},
},
{"range": {
"de": {"gte": de_count}
}
}
]
}
}
}
return using.delete_by_query(index=index, body=query, conflicts="proceed", request_timeout=100)
@classmethod
def searchDoc(cls, using, index=None, query=None):
'''
查找index下所有符合條件的數據
'''
return using.search(index=index, body=query, request_timeout=300)
@classmethod
def getDocById(cls, using, index, id):
'''
獲取指定index、id對應的數據
'''
return using.get(index=index, id=id)
@classmethod
def updateDocById(cls, using, index, id, body=None):
'''
更新id所對應的數據
'''
return using.update(index=index, id=id, body=body)
@classmethod
def updateDocByQuery(cls, using, index, query):
'''
批量更新 符合該條件的批量更改hint字段
query:
'''
return using.update_by_query(index=index, body=query, request_timeout=60)
@classmethod
def insertBulk(cls, using, index, body=None):
'''
批量插入doc
'''
return using.bulk(index=index, body=body, request_timeout=60)
if __name__ == '__main__': # 批量刪除上一次的數據 query = {'query': {'match': {'keyword_id': item.get("keyword_id")}}} ElasticSearchUtil.deleteDocByQuery(KeywordToApp._get_connection(using="search_1"), KeywordToApp._index._name, query=query) # 批量更新新的數據 if key_app: ElasticSearchUtil.insertBulk(KeywordToApp._get_connection(using="search_2"), KeywordToApp._index._name, key_app )
五、遇到的問題
1.默認node最大分片數是1000
如果單個node想要創建更多的index和分片
{"Content-Type":"application/json"} PUT 39.105.220.74:9190/_cluster/settings { "transient": { "cluster": { "max_shards_per_node": 10000 } } }
2.解決丟失分片。又yellow變gree
POST
http://180.76.153.235:9190/_cluster/reroute?retry_failed=true
{"Content-Type":"application/json"}