Elasticsearch Transport 模塊創建及啟動分析


Elasticsearch 通信模塊的分析從宏觀上介紹了ES Transport模塊總體功能,於是就很好奇ElasticSearch是怎么把服務啟動起來,以接收Client發送過來的Index索引操作、GET獲取文檔操作 等一系列操作的呢?本文分析:ElasticSearch6.3.2 Netty Http Server 服務的啟動過程。ES節點啟動,就是啟動各個服務,初始化各個服務代碼實現 在 org.elasticsearch.node.Node的構造方法中,從創建 org.elasticsearch.common.network.NetworkModule 對象開始,NetworkModule 就是ES中所有關於網絡通信相關的功能的創建與注冊吧。

final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController);

在創建NetworkModule對象時,主要是創建2個用於通信的Server

  • 一個是Sever是用來接收用戶發起的各種操作請求的(external REST clients),比如GET、INDEX、BULK WRITE、DELETE...這個Server叫HttpServerTransport(具體實現是Netty4HttpServerTransport)。
  • 另一個Server用於節點之間的通信(transport layer),比如:節點之間相互發送ping命令、集群各個節點之間的信息交換、還有,當GET index/_doc/1 這樣的用戶操作發送到coordinator 節點上,當docid為1的文檔不在本機節點上,那么就會使用TcpTransport(具體實現是Netty4TcpTransport)將命令轉發到目標節點上

A client can either be retrieved from a org.elasticsearch.node.Node started, or connected remotely to one or more nodes using org.elasticsearch.client.transport.TransportClient. Every node in the cluster can handle HTTP and Transport traffic by default. The transport layer is used exclusively for communication between nodes and the Java TransportClient; the HTTP layer is used only by external REST clients.

Netty4HttpServerTransport 對象創建如下,Netty4TcpTransport 也是類似的邏輯。
org.elasticsearch.common.network.NetworkModule#NetworkModule

                Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings,threadPool,bigArrays,circuitBreakerService,namedWriteableRegistry, xContentRegistry, networkService, dispatcher);
                for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                    registerHttpTransport(entry.getKey(), entry.getValue());
                }

Netty4Plugin#getHttpTransports 創建 Netty Http Server:Netty4HttpServerTransport

    @Override
    public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,CircuitBreakerService,circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NamedXContentRegistry xContentRegistry,NetworkService networkService,HttpServerTransport.Dispatcher dispatcher) {
        return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
            () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher));
    }

將構造好的 Transport 對象封裝到 TransportService

//獲取構造好的 Netty4Transport
final Transport transport = networkModule.getTransportSupplier().get();
//將 Netty4Transport 封裝到 TransportService
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);

然后其他需要使用通信功能的模塊,只需要封裝 TransportService 對象即可。比如執行用戶SEARCH操作的搜索模塊 TransportSearchAction,它有一個實例屬性SearchTransportService,而SearchTransportService就封裝了 TransportService,這樣TransportSearchAction就能使用TcpTransport進行通信了。如下代碼所示:
Node.java 構造方法:

//構造SearchTransportService對象時f需要TransportService,TransportService對象 是一個"公共連接對象",許多服務都會用到它
final SearchTransportService searchTransportService =  new SearchTransportService(settings,transportService,SearchExecutionStatsCollector.makeWrapper(responseCollectorService));

這里額外提一句:各種Action對象所依賴的Service,應該都是在Node.java的構造方法里面創建的:比如TransportSearchAction依賴的SearchTransportService、ClusterService等都是在節點啟動時創建的。

當Netty4HttpServerTransport創建完畢后,就需要綁定端口,啟動服務。在org.elasticsearch.node.Node.start方法是ES節點中所有服務的啟動入口(當然也包括Netty Http Server了)
org.elasticsearch.node.Node#start方法

        if (NetworkModule.HTTP_ENABLED.get(settings)) {
            injector.getInstance(HttpServerTransport.class).start();
        }

因為Netty4HttpServerTransport繼承了AbstractLifecycleComponent,因此它的啟動邏輯在org.elasticsearch.common.component.AbstractLifecycleComponent.start中實現,執行doStart()啟動Netty Http Server,並綁定端口到9200
Netty4HttpServerTransport#doStart()

protected void doStart() {
        boolean success = false;
        try {
            this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);//---> es for test

            serverBootstrap = new ServerBootstrap();//workerCount=8, elasticsearch[debug_node][http_server_worker]
            //channel一旦分配給EventLoopGroup里面的某個EventLoop線程后,該channel上的所有的事件都將由這個EventLoop線程處理
            serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
            serverBootstrap.channel(NioServerSocketChannel.class);//處理連接請求,每個連接建立后創建一個'child channel'處理該連接的所有IO事件
            //為child channel 綁定一個handler, 即用該handler處理該 channel 上的io event
            serverBootstrap.childHandler(configureServerChannelHandler());//--->Netty4HttpRequestHandler
            //指定 child channel 一些配置參數 (父channel是處理連接請求的channel, child channel是已建立的連接的事件處理通道)
            serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
            //---> TCP 發送緩沖區大小
            final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
            if (tcpSendBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
            }
            //---> TCP 接收緩沖區大小
            final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
            if (tcpReceiveBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
            }

            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

            final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
            serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
            serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);

            this.boundAddress = createBoundHttpAddress();//--->ServerBootStrap綁定端口
            if (logger.isInfoEnabled()) {
                logger.info("{}", boundAddress);
            }
            success = true;
        } finally {
            if (success == false) {
                doStop(); // otherwise we leak threads since we never moved to started
            }
        }
    }

Netty Http Server的worker線程數量是:節點所在的機器上的可用CPU核數:(Runtime.getRuntime().availableProcessors()*2)
其他的一些默認配置如下:
TCP_NODELAY=true, SO_KEEPALIVE=true

ServerBootstrap(ServerBootstrapConfig(group: NioEventLoopGroup, channelFactory: NioServerSocketChannel.class, options: {RCVBUF_ALLOCATOR=io.netty.channel.FixedRecvByteBufAllocator@72ce8a9b, SO_REUSEADDR=true}, childGroup: NioEventLoopGroup, childOptions: {TCP_NODELAY=true, SO_KEEPALIVE=true, RCVBUF_ALLOCATOR=io.netty.channel.FixedRecvByteBufAllocator@72ce8a9b, SO_REUSEADDR=true}, childHandler: org.elasticsearch.http.netty4.Netty4HttpServerTransport$HttpChannelHandler@56ec6ac0))

ES Server 接收用戶請求(GET/WRITE/DELETE...)的起始處理點 在哪里?

由於ES Server(實在找不到其他更好的名字來描述了...)是基於 Netty的,那肯定有個ChannelHandler負責處理發生在SocketChannel上的事件。而這個ChannelHandler就是:org.elasticsearch.http.netty4.Netty4HttpRequestHandler
org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#initChannel 方法中注冊了Netty4HttpRequestHandler,因此用戶請求就交給Netty4HttpRequestHandler來處理了。

            ch.pipeline().addLast("handler", requestHandler);//Netty4HttpRequestHandler 業務邏輯處理

那根據Netty框架,毫無疑問 接收用戶請求的起始處理點在 org.elasticsearch.http.netty4.Netty4HttpRequestHandler#channelRead0 方法里面了。
因此,如果想debug一下INDEX操作、GET操作、DELETE操作的入口,在入口點: org.elasticsearch.http.netty4.Netty4HttpRequestHandler#channelRead0 打上debug斷點,在返回處:org.elasticsearch.http.netty4.Netty4HttpChannel#sendResponse 打上debug斷點,根據IDEA的 dubuger frames 棧追蹤 查看各個操作的執行路徑。

既然所有的用戶操作都是統一的入口,那么又是如何解析這些操作,並最終傳遞給合適的 TransportXXXAction 來處理的呢?其大概步驟如下:

  • 1,ES每個操作(JAVA API/rest api)都有對應的Action類,比如:DELETE APID的Action類是:RestDeleteAction;GET API 的Action類是:RestGetAction。
  • 2,每個Action類都重寫了父類的org.elasticsearch.rest.BaseRestHandler#prepareRequest方法,構造出相應的Action對象,並在方法中返回一個lambda表達式,代表需要執行該操作。接下來,該操作在 BaseRestHandler#handleRequest 方法中的 action.accept(channel)語句觸發執行。
  • 3,觸發執行后,這些Action操作由 NodeClient#doExecute 方法發送到相應的節點上執行:先獲得 執行Action操作所對應的 TransportXXXAction類,再通過 execute(request,listener) 執行,代碼如下:
        return transportAction(action).execute(request, listener)
    
      TransportAction#execute(Request, org.elasticsearch.action.ActionListener<Response>)是執行各種Action操作的統一入口,最終在在:TransportAction.RequestFilterChain#proceed 中`this.action.doExecute(task, request, listener);`調用每個實現類TransportXXXAction#doExecute()執行對應的操作!
    

比如說:GET操作由:TransportSingleShardAction#doExecute處理;DELETE操作由:TransportBulkAction#doExecute(Task,BulkRequest, ActionListener )處理。

  • 4,繼續深入分析DELETE操作。TransportBulkAction#doExecute 調用 org.elasticsearch.action.bulk.TransportBulkAction#executeBulk啟動一個新任務:BulkOperation。由於DELETE操作是與分片相關的操作,即需要從分片上刪除數據,因此在org.elasticsearch.action.bulk.TransportBulkAction.BulkOperation#doRun 方法中判斷該操作是一個DELETE類型的操作,並執行:shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>(){...});將刪除操作提交給"分片處理Action"---TransportShardBulkAction執行。

  • 5,TransportShardBulkAction繼承自TransportAction,execute當然還是走“相同的”邏輯到這個方法里面:TransportAction#execute(Task,Request,ActionListener),再到processed()方法里面this.action.doExecute(task, request, listener);,這時就是調用:TransportShardBulkAction的doExecute方法了。而TransportShardBulkAction的doExecute()方法是繼承自TransportReplicationAction,可以看到在這里面執行的是ReroutePhase任務,這也很好理解,因為刪除一篇文檔,需要知道這篇文檔在哪個分片上,需要把刪除請求發送到這個分片上去,這也是為什么需要ReroutePhase的原因吧:

        protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        new ReroutePhase((ReplicationTask) task, request, listener).run();
    }
    
  • 6,跟蹤到ReroutePhase的doRun()方法里面看:刪除操作在本機節點上執行performLocalAction,刪除操作在其他遠程節點上執行:performRemoteAction。這里,又通過TransportService#sendRequest 方法把請求發送出去了。。。煩,那我就繼續跟蹤,看看你翻跟斗到哪里去了……

    if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
                performLocalAction(state, primary, node, indexMetaData);
            } else {
                performRemoteAction(state, primary, node);
            }
    
  • 7,那跟斗到底翻到哪里去了呢?其實這個也很好判斷,這是一個DELETE操作,它所對應的Action執行是TransportReplicationAction,而且DELETE操作肯定是要走primary shard的,結果在TransportReplicationAction的內部類PrimaryOperationTransportHandler里面發現了接收方法:PrimaryOperationTransportHandler#messageReceived(ConcreteShardRequest,TransportChannel,Task),里面創建AsyncPrimaryAction任務,在TransportReplicationAction.AsyncPrimaryAction#doRun里面,才是真正地開始在分片上獲取訪問鎖,並刪除文檔。

  • 8,AsyncPrimaryAction#doRun成功獲取到鎖(PrimaryShardReference)后,回調:AsyncPrimaryAction#onResponse,在createReplicatedOperation(...).execute()觸發底層Lucene刪除邏輯。

刪除的時候,有相應的刪除策略,具體實現在:org.elasticsearch.index.engine.InternalEngine#planDeletionAsPrimary

if (versionValue == null) {
            currentVersion = Versions.NOT_FOUND;
            currentlyDeleted = true;
        } else {
            currentVersion = versionValue.version;
            currentlyDeleted = versionValue.isDelete();
        }
        final DeletionStrategy plan;
        if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
            final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
            plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
        } else {
            plan = DeletionStrategy.processNormally(
                    currentlyDeleted,
                    generateSeqNoForOperation(delete),
                    delete.versionType().updateVersion(currentVersion, delete.version()));
        }
        return plan;

刪除doc的時候,還要判斷docid在不在,具體實現在:org.elasticsearch.index.engine.InternalEngine#loadCurrentVersionFromIndex

private long loadCurrentVersionFromIndex(Term uid) throws IOException {
        assert incrementIndexVersionLookup();
        try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
            return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid);
        }
    }

另外在看源碼的時候發現,delete-by-doc-id 是不會觸發 段合並的。所以,delete by id 這種方式的刪除是很快的且對集群負載影響很小:

    // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:

最終在:org.elasticsearch.index.engine.InternalEngine#delete 方法里面進行Lucene層面上的文檔刪除:

if (delete.origin() == Operation.Origin.PRIMARY) {
                plan = planDeletionAsPrimary(delete);
            } else {
                plan = planDeletionAsNonPrimary(delete);
            }

            if (plan.earlyResultOnPreflightError.isPresent()) {
                deleteResult = plan.earlyResultOnPreflightError.get();
            } else if (plan.deleteFromLucene) {
                deleteResult = deleteInLucene(delete, plan);
            } else {
                deleteResult = new DeleteResult(
                        plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
            }

具體實現在:org.elasticsearch.index.engine.InternalEngine#deleteInLucene里面,代碼就不貼了。以上,就是一個完整的 ES delete by doc id 的執行流程。感興趣的可以再細究。

這篇文章最后,詳細介紹了DELET API的執行路徑,其他操作也是類似的,按這個分析即可。
原文:https://www.cnblogs.com/hapjin/p/11018479.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM