ElasticSearch Index操作源碼分析
本文記錄ElasticSearch創建索引執行源碼流程。從執行流程角度看一下創建索引會涉及到哪些服務(比如AllocationService、MasterService),由於本人對分布式系統理解不是很深,所以很多一些細節原理也是不懂。
創建索引請求。這里僅僅是創建索引,沒有寫入文檔。
curl -X PUT "localhost:9200/twitter"
ElasticSearch服務器端收到Client的創建索引請求后,是從org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction開始執行索引創建流程的。

創建索引是需要ElasticSearch Master節點參與的,因此TransportCreateIndexAction繼承了TransportMasterNodeAction,而創建索引的具體操作由實例屬性MetaDataCreateIndexService完成。
/**
* Create index action.
*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {
//
private final MetaDataCreateIndexService createIndexService;
在MetaDataCreateIndexService.createIndex(...)調用onlyCreateIndex方法執行創建索引操作。
public void createIndex(...)
{
onlyCreateIndex(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards
}
Creates an index in the cluster state and waits for the specified number of shard copies to become active as specified in CreateIndexClusterStateUpdateRequest#waitForActiveShards()before sending the response on the listener.
創建索引需要檢查 Active shards,默認情況下:只要Primary Shard是Active的,就可以創建索引。如果Active shards未達到指定的數目,則會創建索引請求會阻塞,直到集群中Active shards恢復到指定數目或者超時返回。可參考:ActiveShardsObserver#waitForActiveShards(...)方法。
索引的創建封裝在org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.IndexCreationTask#IndexCreationTask對象中,最終由具有優先級任務隊列的線程池PrioritizedEsThreadPoolExecutor執行。

創建索引這樣的操作需要通知到集群中各個節點,修改集群的狀態,因此IndexCreationTask繼承了AckedClusterStateUpdateTask。
在MetaDataCreateIndexService#onlyCreateIndex(...)提交IndexCreationTask。
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
new IndexCreationTask(logger, allocationService, request, listener, indicesService, aliasValidator, xContentRegistry, settings,
this::validate));
跟蹤submitStateUpdateTasks(...)調用棧,在org.elasticsearch.cluster.service.MasterService#submitStateUpdateTasks(...)方法中lambda map函數 將IndexCreationTask對象轉換成可供線程池執行的Runnable任務Batcher.UpdateTask。
public <T> void submitStateUpdateTasks(...,Map<T, ClusterStateTaskListener> tasks,...)
{
try {
List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor))
.collect(Collectors.toList());
//taskBatcher org.elasticsearch.cluster.service.TaskBatcher
taskBatcher.submitTasks(safeTasks, config.timeout());
}
}
//PrioritizedEsThreadPoolExecutor execute(...)提交創建索引任務
public abstract class TaskBatcher {
private final PrioritizedEsThreadPoolExecutor threadExecutor;
public void submitTasks(...){
if (timeout != null) {
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
} else {
threadExecutor.execute(firstTask);
}
}
}
org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask的繼承關系如下:可以看出它是一個Runnable任務,創建索引操作最終由PrioritizedEsThreadPoolExecutor線程池提交任務執行。

PrioritizedEsThreadPoolExecutor擴充自ThreadPoolExecutor,參考這個類的源代碼,可以了解ElasticSearch是如何自定義一個帶有任務優先級隊列的線程池的,也可以學習一些如何擴展線程池的功能。

跟蹤threadExecutor.execute(...)代碼,
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
//給Runnable任務再添加一些額外的功能,比如優先級
command = wrapRunnable(command);
//
doExecute(command);
}
//EsThreadPoolExecutor
protected void doExecute(final Runnable command) {
try {
super.execute(command);//提交任務
}catch (EsRejectedExecutionException ex) {
if (command instanceof AbstractRunnable) {
// If we are an abstract runnable we can handle the rejection
// directly and don't need to rethrow it.
try {
((AbstractRunnable) command).onRejection(ex);
} finally {
((AbstractRunnable) command).onAfter();
}
}
當然了,由於PrioritizedEsThreadPoolExecutor擴展自ThreadPoolExecutor,最終的執行是在:ThreadPoolExecutor的內部類Worker#runWorker(Worker w)中執行。可參考探究ElasticSearch中的線程池實現中的第3點分析。
上面分析的是線程執行流程,而具體的業務邏輯代碼(創建索引更新集群的狀態信息)在Runnable#run()中,也就是org.elasticsearch.cluster.service.TaskBatcher.BatchedTask#run()方法中。
//BatchedTask
public void run() {runIfNotProcessed(this);}
void runIfNotProcessed(BatchedTask updateTask) {
//任務的判斷、檢查是否重復、是否已經執行過了……
//忽略其他無關代碼....
run(updateTask.batchingKey, toExecute, tasksSummary);
}
/**
* Action to be implemented by the specific batching implementation
* All tasks have the given batching key.
*/
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
抽象run(...)具體實現在:org.elasticsearch.cluster.service.MasterService.Batcher#run
@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
//TaskInputs Represents a set of tasks to be processed together with their executor
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
}
//最終節點狀態更新信息實現邏輯
protected void runTasks(TaskInputs taskInputs) {
final ClusterState previousClusterState = state();
//改變集群的狀態(各個分片的處理邏輯)
TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);
//將變化了的狀態同步給其他節點
if (taskOutputs.clusterStateUnchanged()) {
//未檢測到集群狀態信息變化
}else{
ClusterState newClusterState = taskOutputs.newClusterState;
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
//Returns the DiscoveryNodes.Delta between the previous cluster state and the new cluster state.
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
}
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodeSummary = nodesDelta.shortSummary();
if (nodeSummary.length() > 0) {
logger.info("{}, reason: {}", summary, nodeSummary);
}
}
//Called when the result of the ClusterStateTaskExecutor#execute(ClusterState, List) have
//been processed properly by all listeners.
taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);
//Callback invoked after new cluster state is published
taskOutputs.clusterStatePublished(clusterChangedEvent);
}
在這行代碼:TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);輸入創建索引任務,輸出集群狀態變化結果。
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
//...
}
protected ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs,...){
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
//ShardStartedClusterStateTaskExecutor#execute
clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
}
public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState, List<StartedShardEntry> tasks)
{
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
for (StartedShardEntry task : tasks) {
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
//....省略其他代碼
shardRoutingsToBeApplied.add(matched);
}
maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied);
builder.successes(tasksToBeApplied);
}
最終是在org.elasticsearch.cluster.action.shard.ShardStateAction.ShardStartedClusterStateTaskExecutor#execute方法里面更新各個分片的狀態,具體實現邏輯我也不是很懂。里面涉及到:ShardRouting路由表、AllocationService。
AllocationService manages the node allocation of a cluster. For this reason the AllocationService keeps AllocationDeciders to choose nodes for shard allocation. This class also manages new nodes joining the cluster and rerouting of shards.
Elasticsearch集群狀態信息
集群狀態信息包括:集群uuid、版本號、索引的配置信息及修改/刪除記錄、分片的在各個節點上的分配信息……保證各個節點上擁有一致的集群狀態信息是很重要的,TLA+是驗證集群狀態一致性的一種方法。
The cluster state contains important metadata about the cluster, including what the mappings look like, what settings the indices have, which shards are allocated to which nodes, etc. Inconsistencies in the cluster state can have the most horrid consequences including inconsistent search results and data loss, and the job of the cluster state coordination subsystem is to prevent any such inconsistencies.
集群狀態信息示例:
cluster uuid: 3LZs2L1TRiCw6P2Xm6jfSQ
version: 7
state uuid: bPCusHNGRrCxGcEpkA6XkQ
from_diff: false
meta data version: 34
[twitter/5gMqkF9oQaCdCCVXs7VrtA]: v[9]
0: p_term [2], isa_ids [QSYDJpzBRtOQjUDJIIPm7g]
1: p_term [2], isa_ids [LF3sOw51R1eS7XS_iXkkvQ]
2: p_term [2], isa_ids [gEfexQgbQRmd1qRplOjmag]
3: p_term [2], isa_ids [yZkB1nFHT22wtBnDBqGsKQ]
4: p_term [2], isa_ids [9oFMzwuwSOK1Ir-1SLxqHg]
metadata customs:
index-graveyard: IndexGraveyard[[[index=[twitter/KisMFiobQDSN23mjdugD0g], deleteDate=2018/12/25 02:05:54], [index=[twitter/trTv2ambSFOvKlGr_y0IKw], deleteDate=2019/01/03 03:19:44], [index=[twitter/sfWVXeklQ321QFxwLxSUPw], deleteDate=2019/01/03 09:51:45]]] ingest: org.elasticsearch.ingest.IngestMetadata@6d83dbd7 licenses: LicensesMetaData{license={"uid":"2a6f6ac2-2b3a-4e7b-a6c6-aed3e6e8edce","type":"basic","issue_date_in_millis":1545294198272,"max_nodes":1000,"issued_to":"elasticsearch","issuer":"elasticsearch","signature":"/////QAAANDadY9WjYQDyz2N6XstmWiReALKju/xLVk8VGXRfRbPPJxRbjxUMfiX9PHLz5AdfV2aFaGS6aGTtzoyKC5sOZQQbXCxzq8YTt+zbs+ld5OxOfDJ3yMVaJS5vAZuIlQQfkmMdIAnq7VolQbiADUHjKJnIZc0/Sb51YEUTtPykjPRrHF0NEKCOfhbQ4Jn5xOaweKvsTjOqxp1JJkOUOA+vvGqgxuZVxbDATEnW+6+kGP8WdkcvRpFlhwKMAKso9LzPaJ3NCO4zrZ+N9WUfA+TlRz4","start_date_in_millis":-1}, trialVersion=null}
nodes:
{debug_node}{JLqmOfYoTcS8IENG4pmnOA}{yhUOUQfXS7-Xzzm8_wzjoA}{127.0.0.1}{127.0.0.1:9300}{ml.machine_memory=8277266432, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}, local, master
routing_table (version 5):
-- index [[twitter/5gMqkF9oQaCdCCVXs7VrtA]]
----shard_id [twitter][0]
--------[twitter][0], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=QSYDJpzBRtOQjUDJIIPm7g]
--------[twitter][0], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
----shard_id [twitter][1]
--------[twitter][1], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=LF3sOw51R1eS7XS_iXkkvQ]
--------[twitter][1], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
----shard_id [twitter][2]
--------[twitter][2], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=gEfexQgbQRmd1qRplOjmag]
--------[twitter][2], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
----shard_id [twitter][3]
--------[twitter][3], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=yZkB1nFHT22wtBnDBqGsKQ]
--------[twitter][3], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
----shard_id [twitter][4]
--------[twitter][4], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=9oFMzwuwSOK1Ir-1SLxqHg]
--------[twitter][4], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
routing_nodes:
-----node_id[JLqmOfYoTcS8IENG4pmnOA][V]
--------[twitter][1], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=LF3sOw51R1eS7XS_iXkkvQ]
--------[twitter][4], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=9oFMzwuwSOK1Ir-1SLxqHg]
--------[twitter][3], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=yZkB1nFHT22wtBnDBqGsKQ]
--------[twitter][2], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=gEfexQgbQRmd1qRplOjmag]
--------[twitter][0], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=QSYDJpzBRtOQjUDJIIPm7g]
---- unassigned
--------[twitter][1], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
--------[twitter][4], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
--------[twitter][3], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
--------[twitter][2], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
--------[twitter][0], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]]
customs:
snapshots: SnapshotsInProgress[] restore: RestoreInProgress[] snapshot_deletions: SnapshotDeletionsInProgress[] security_tokens: TokenMetaData{ everything is secret }
Index文檔操作
PUT twitter/_doc/1
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
在創建索引時,寫入文檔到索引中。
整個具體流程從:TransportBulkAction#doExecute(...) 方法開始,分別兩部分:創建索引、寫入文檔。
其中,創建索引由createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener)實現,整個具體流程如下:
//1 TransportBulkAction#doExecute(BulkRequest, ActionListener<BulkResponse>)
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>()
//2 TransportBulkAction#createIndex
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
createIndexAction.execute(createIndexRequest, listener);
}
//3 TransportAction#execute(Request, ActionListener<Response>)
execute(task, request, new ActionListener<Response>() {...}
//4 TransportAction#execute(Task, Request, ActionListener<Response>)
requestFilterChain.proceed(task, actionName, request, listener);
//5 TransportAction.RequestFilterChain#proceed(Task,String,Request,ActionListener<Response>)
this.action.doExecute(task, request, listener);
//6 TransportMasterNodeAction#doExecute
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
new AsyncSingleAction(task, request, listener).start();
}
//7 TransportMasterNodeAction.AsyncSingleAction#start
public void start() {doStart(state);}
//8 TransportMasterNodeAction.AsyncSingleAction#doStart
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override
protected void doRun() throws Exception {
masterOperation(task, request, clusterState, delegate);
}
});
//9 TransportMasterNodeAction#masterOperation(Task, Request,ClusterState,ActionListener<Response>)
masterOperation(request, state, listener);
//10 TransportCreateIndexAction#masterOperation
createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)),
listener::onFailure));
//11 MetaDataCreateIndexService.createIndex(...)
//到這里就是本文中提到 MetaDataCreateIndexService 創建索引的流程了
寫入文檔由executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)實現,整個代碼流程如下,寫入文檔,先寫primary shard,主要在TransportShardBulkAction類中實現。
//1 TransportBulkAction#executeBulk
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)
//2 org.elasticsearch.common.util.concurrent.AbstractRunnable#run
new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
//3 TransportBulkAction.BulkOperation#doRun
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>()
//4 TransportAction#execute(Request, org.elasticsearch.action.ActionListener<Response>)
execute(task, request, new ActionListener<Response>() {
//5 TransportAction.RequestFilterChain#proceed
// 到這里步驟,就已經和上面創建索引的第4步是一樣的了,都是由TransportAction#doExecute提交任務
requestFilterChain.proceed(task, actionName, request, listener)
//6 --->各種TransportXXXAction都實現了TransportAction#doExecute
//創建索引:TransportBulkAction#doExecute()
//轉發給 primary shard 進行寫操作:TransportReplicationAction#doExecute()
this.action.doExecute(task, request, listener);
//7 TransportReplicationAction#doExecute(Task, Request, ActionListener<Response>)
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
//8 TransportReplicationAction.ReroutePhase#doRun
//獲取primary shard相關信息,示例信息如下:文檔被reroute到編號為3的primary shard上
//[twitter][3], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=N8n0QgxBQVeHljx1RpkYMg]
final ShardRouting primary = primary(state)
//獲取 "primary shard在哪個節點上?"
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
//根據節點id判斷primary shard是否在當前節點上
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
//primary shard在本節點上
performLocalAction(state, primary, node, indexMetaData);
} else {
//primary shard不在本節點,需要將索引操作轉發到正確的節點上
performRemoteAction(state, primary, node);
}
//9 TransportReplicationAction.ReroutePhase#performLocalAction(假設primary shard 在本機節點上)
//這里有2個重要的概念(屬性): allocationId 和 primary term
performAction(node, transportPrimaryAction, true,
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetaData.primaryTerm(primary.id())));
//10 TransportReplicationAction.ReroutePhase#performAction
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
//11 TransportService#sendRequest(Transport.Connection, String, TransportRequest, TransportRequestOptions, TransportResponseHandler<T>)
asyncSender.sendRequest(connection, action, request, options, handler);
sendLocalRequest(requestId, action, request, options)
//TransportService#sendLocalRequest
final String executor = reg.getExecutor();
//到這里最終是EsThreadPoolExecutor#execute提交任務
threadPool.executor(executor).execute(new AbstractRunnable() {
AllocationID
allocationId是org.elasticsearch.cluster.routing.ShardRouting類的屬性,引用書中一段話:“Allocation ID 存儲在shard級元信息中,每個shard都有一個唯一的Allocation ID”,同時master節點在集群級元信息中維護一個被認為是最新shard的Allocation ID集合,這個集合稱為in-sync allocation IDs
再看org.elasticsearch.cluster.routing.AllocationId類的注釋:
Uniquely identifies an allocation. An allocation is a shard moving from unassigned to initializing,or relocation.
Relocation is a special case, where the origin shard is relocating with a relocationId and same id, and the target shard (only materialized in RoutingNodes) is initializing with the id set to the origin shard relocationId. Once relocation is done, the new allocation id is set to the relocationId. This is similar behavior to how ShardRouting#currentNodeId is used.
再看ES副本模型官方文檔:in-sync 集合里面的shard維護着當前最新的索引文檔操作寫入的document。
Elasticsearch maintains a list of shard copies that should receive the operation. This list is called the in-sync copies and is maintained by the master node. As the name implies, these are the set of "good" shard copies that are guaranteed to have processed all of the index and delete operations that have been acknowledged to the user. The primary is responsible for maintaining this invariant and thus has to replicate all operations to each copy in this set.
在正常情況下,primary shard肯定是in-sync集合里面的shard,它是一個"good" shard copy。當primay shard所在的機器掛了時,master節點會立即從in-sync集合中選出一個replica shard作為 primary shard,這個replica shard升級為primary shard的操作是很快的,畢竟in-sync集合中的shard有着最新的數據,因此,也避免了因“將某個不太新的shard升級為primary shard而導致數據丟失的情況”。關於這個解釋,耐心的話,可參考:ElasticSearch index 剖析
primary term
關於primary term參考:elasticsearch-sequence-ids和下面的源碼注釋:
The term of the current selected primary. This is a non-negative number incremented when a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary.Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard that can be indexed into) is larger than 0.
它為Index操作引入了一個全局順序號。
The primary shard accepts indexing operations (indexing operations are things like "add documents" or "delete a document") first and replicates the index operations to replicas. It is therefore fairly straightforward to keep incrementing a counter and assign each operation a sequence number before it's being forwarded to the replicas.
看完了這篇介紹,知道設計一個分布式系統下全局的唯一ID有多難。有時候,難的地方不在於如何生成一個這樣的ID,因為生成全局唯一ID只是手段。最終的目的,是需要這些的一個全局ID來干什么?當涉及到各種各樣的應用場景時,這樣的一個全局ID還能不能保證正確性?因為,也許引入了ID解決了當前的這個問題,但是由它引起的其他問題,或者尚未考慮到的其他問題,則極有可能導致數據不正確。而具體到ElasticSearch索引操作,一篇文檔寫入primary shard后,為了數據可靠性,還得寫入replica。ES的數據副本模型借鑒了pacifica-replication-in-log-based-distributed-storage-systems算法,也引入了TLA+規范。而引入primary term,就能區分index操作是否由舊的primary shard執行,還是在當前新的primary shard上執行。各個shard的primary term信息由master維護並且持久化到集群狀態中,每當shard的身份改變時(比如被提升為primary shard 或者 被降級為普通的shard/replica),primary term就會加1,解決在並發寫情形下可能出現的數據不一致的問題。
The first step we took was to be able to distinguish between old and new primaries. We have to have a way to identify operations that came from an older primary vs operations that come from a newer primary. On top of that, the entire cluster needs to be in agreement as to that so there's no contention in the event of a problem. This drove us to implement primary terms.
global checkpoints and local checkpoints
global checkpoint作用:
The global checkpoint is a sequence number for which we know that all active shards histories are aligned at least up to it.
這里的active shards應該是 上面討論的in-sync集合列表里面的shards。
all operations with a lower sequence number than the global checkpoint are guaranteed to have been processed by all active shards and are equal in their respective histories. This means that after a primary fails, we only need to compare operations above the last known global checkpoint between the new primary and any remaining replicas.
引入global checkpoint之后,當前的primary shard因為故障宕機后,變成了舊的primary shard,master從in-sync集合列表中選出一個replica作為新的primary shard,Client發起的index操作可繼續請求給新的primary shard(這也是為什么index操作默認有1分鍾的超時的原因,只要在這1分鍾里面順利地選出新primary shard,就不會影響Client的index操作)。當舊的primary shard恢復過來后,對比舊primary shard的global checkpoint和新的primary shard的global checkpoint,進行數據同步。
Advancing the global checkpoint is the responsibility of the primary shard.It does so by keeping track of completed operations on the replicas.Once it detects that all replicas have advanced beyond a given sequence number, it will update the global checkpoint accordingly.
global checkpoint的更新由 primary shard推進完成。至於primary shard如何更新global checkpoint的值,可參考上面提到的pacifica-replication-in-log-based-distributed-storage-systems這篇論文。另外elasticsearch-sequence-ids-6-0動畫圖演示了 checkpoint 是如何更新的。
在這個動畫演示中,有一個步驟:當Client發起第4、5、6篇文檔的index請求操作時,當前primary shard將第5、6篇文檔復制給了中間那個shard,將第4、6篇文檔復制給了最右邊那個shard,然后primary shard掛了。此時,master選擇了中間那個shard作為 new primary shard,那new primary shard上的第4篇文檔是如何來的呢?
我的理解是:還是前面提到的,Client 發起的Index操作默認有1分鍾的超時,如果Client未收到索引成功Ack,ElasticSearch High Level Restful JAVA api 應該要重新發起請求。然后新的請求,就會被路由到中間那個shard,也即new primary shard,從而new primary shard有了第4篇文檔。

這個issue中描述了Index操作的基本步驟(index flow)
總結
- ElasticSearch Index 操作(Action) 會轉化成Runnable的任務,提交給線程池異步執行,創建索引。
- 創建索引涉及集群狀態的變化,因此會創建一個更新任務,更新集群狀態。
- 各種任務轉化成Runnable 對象后,org.elasticsearch.common.util.concurrent.AbstractRunnable是這些任務的基類。各種操作TransportXXXAction都會繼承TransportAction,在doExecute()里面通過線程池方式提交任務,最大的收獲還是將各種任務轉化、封裝、回調執行結果、任務統一提交給線程池執行的這種設計思路。
- AbstractRunnable擴展了Runnable的功能,它里面實現了
public void onRejection(Exception e),當線程池提交任務被拒絕時(線程池的任務拒絕策略)就會調用該方法;里面還定義了抽象方法public abstract void onFailure(Exception e);,當線程執行過程中出現了異常時,就會調用該方法。(This method is invoked for all exception thrown by doRun()) - AbstractRunnable體現了如何擴展線程功能,因為java.lang.Runnable接口是不允許在run()方法里面向外拋出異常的,那我們如何優雅地處理run()方法里面運行時異常呢?AbstractRunnable重寫run()方法如下:所有的業務邏輯實現放在doRun()方法里面,這也是為什么ES源碼里面各個TransportXXXAction最終都會執行一個doRun()方法的邏輯,線程執行過程中所有的異常由onFailure()方法處理,而onFailure()是個抽象方法,每個Action可以有自己的異常處理邏輯,不得不佩服這種代碼設計的思路。
@Override
public final void run() {
try {
doRun();
} catch (Exception t) {
onFailure(t);
} finally {
onAfter();
}
}
關於ElasticSearch Index操作的流程,參考ElasticSearch 索引 剖析
ElasticSearch源碼閱讀相關文章:
