Vert.x 線程模型揭秘



來源:鳥窩,

colobu.com/2016/03/31/vertx-thread-model/

如有好文章投稿,請點擊 → 這里了解詳情


Vert.x是一個在JVM開發reactive應用的框架,可用於開發異步、可伸縮、高並發的Web應用(雖然不限於web應用)。其目的在於為JVM提供一個Node.js的替代方案。開發者可以通過它使用JavaScript、Ruby、Groovy、Java,甚至是混合語言來編寫應用。


使用Vertx.x框架,可以用JavaScript、CoffeeScript、Ruby、Python、Groovy或Java開發應用程序的組件,最終應用程序可以是混合語言構建的。


本文試圖揭示Vert.x的線程模型的應用,通過源代碼分析Vert.x如何使用線程池處理請求的,以及比較Netty和Vert.x使用線程池的異同。


也許你覺得奇怪,默認啟動一個Vert.x Verticle實例,它只用一個線程處理事件,在多核的情況下你需要創建多個Verticle實例以充分利用多個CPU Core的性能。


Vert.x 實例


首先先啰嗦地介紹一些Vert.x概念,熟悉Vert.x開發的朋友可以跳過這一節

在Vert.x里,如果你不使用Vertx對象,你幾乎是寸步難行。


Vertx對象扮演着Vert.x控制中心的角色,同時它也提供了大量的功能,例如:


  • 編寫TCP客戶端和服務器

  • 編寫HTTP客戶端和服務器,包括websocket

  • Event bus

  • 共享數據

  • 定時器

  • 發布和卸載Verticle

  • UDP

  • DNS client

  • 文件系統訪問

  • 高可用

  • 集群


如果你將Vert.x嵌入到你的應用程序中,你可以向下面這樣獲得一個Vertx對象的引用


Vertx vertx = Vertx.vertx();


當你實例化Vertx對象時,如果你感覺默認的參數不符合你的需求,你可以指定實例化時的參數:


Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));


VertxOptions對象擁有很多關於Vertx實例設置,例如配置集群,高可用設置,線程池大小以及等等其他參數。下面就介紹一下它的線程池。


1 線程池


1、eventLoopGroup


這個對象是NioEventLoopGroup的一個實例,它的線程池的大小由options.getEventLoopPoolSize()決定,如果沒有設置,默認為CPU核數 * 2。


eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false);

   eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);

   eventLoopGroup.setIoRatio(NETTY_IO_RATIO);


它的EventLoop和一個Context對應:


protected ContextImpl(……) {

    ……

    EventLoopGroup group = vertx.getEventLoopGroup();

    if (group != null) {

      this.eventLoop = group.next();

    } else {

      this.eventLoop = null;

    }

    ……

}


它用來執行標准的Verticle。


2、WorkerPool


用來執行worker Verticle。


workerPool = Executors.newFixedThreadPool(options.getWorkerPoolSize(),

                new VertxThreadFactory("vert.x-worker-thread-", checker, true));


3、Internal Blocking Pool


內部使用的線程池,可以用來將阻塞代碼異步化。


internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),

                new VertxThreadFactory("vert.x-internal-blocking-", checker, true));


不要在event loop中執行阻塞操作, 比如訪問數據庫或者網絡資源,這絕對會影響你的應用的性能。對於這些阻塞操作,你可以將它們異步化:


vertx.executeBlocking(future -> {

  // 下面這行代碼可能花費很長時間

  String result = someAPI.blockingMethod("hello");

  future.complete(result);

}, res -> {

  System.out.println("The result is: " + res.result());

});


默認情況下executeBlocking會在同一個context中執行(同一個verticle實例),它們會串行化執行。如果不關心這個執行的順序,可以將ordered參數設為false,它們會在worker pool線程池中並行的執行。


另外一種執行阻塞代碼的方式就是使用worker verticle,worker verticle總是在worker pool線程池中執行。


2 Verticle


Verticle有點類似Actor模型,也可以實現並發的,可擴展的,易於發布的模型。


一個vert.x應用可以包含多個verticle實例,實例之間可以通過event bus通訊。


2.1 三種類型


http://vertx.io/docs/vertx-core/java/#_verticle_types


1、Standard Verticle: 最通用的類型,總是在event loop中執行。


2、Worker Verticle:它們使用worker pool線程池運行。一個verticle實例絕對不會在兩個或者更多線程中並發執行。


3、Multi-threaded worker verticle:它們使用worker pool線程池運行。 一個verticle實例可以在多個線程中並發執行。


實現一個Verticle很簡單:


public class MyVerticle extends AbstractVerticle {

  // 當發布verticle時調用

  public void start() {

  }

  // 可以不實現。當 verticle 卸載時調用

  public void stop() {

  }

}


2.2 發布方式


1、命令行方式


vertx run SomeJavaSourceFile.java


或者通過maven-shade-plugin打包成一個fat包:


<transformers>

    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

        <manifestEntries>

            <Main-Class>io.vertx.core.Starter</Main-Class>

            <Main-Verticle>com.colobu.vertx.Main</Main-Verticle>

        </manifestEntries>

    </transformer>

    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

        <resource>META-INF/services/io.vertx.core.spi.VerticleFactory</resource>

    </transformer>

</transformers>


然后運行 java -jar xxx-fat.jar,你還可以傳遞一些參數。


2、編程方式


你也可以編程的方式,通過vertx.deployVerticle發布:


public class Main  extends AbstractVerticle {

    public static void main(String[] args) {

        VertxOptions vo = new VertxOptions();

        vo.setEventLoopPoolSize(16);

        Vertx vertx = Vertx.vertx(vo);

        DeploymentOptions options = new DeploymentOptions();

        options.setInstances(100);

        vertx.deployVerticle(Main.class.getName(), options, e -> {

            System.out.println(e.succeeded());

            System.out.println(e.failed());

            System.out.println(e.cause());

            System.out.println(e.result());

        });

    }

    @Override

    public void start() {

        Handler<HttpServerRequest> handler = e -> {

            HttpServerResponse response = e.response();

            response.putHeader("content-type", "application/json").end("Hello world");

        };

        vertx.createHttpServer().requestHandler(handler).listen(8080);

    }

}


Verticle發布和Vert.x線程模型


以上比較啰嗦,主要介紹了一些Vert.x的一些概念。下面是我想重點介紹的內容。


本節以實現一個簡單的http server為例(編程方式發布Verticle),分析 vert.x 的線程和Verticle的關系。只分析標准的Verticle。代碼如上。


1 Verticle發布過程


首先先創建一個Vertx實例,可以你可以通過VertxOptions設置線程池的大小。上面的例子中設置Event Loop線程池的大小為16:


vo.setEventLoopPoolSize(16);


因此即使你創建幾百個Verticle,也只會有16個Event Loop處理它們,你可以通過jstack查看這些線程。你會看到多個名為vert.x-eventloop-thread-<num>的線程,一個vertx-blocked-thread-checker線程,一個vert.x-acceptor-thread-0。


調用void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler)方法發布Verticle。

DeploymentOptions對象可以設置發布參數,比如是否是worker verticle,多線程worker verticle, ha, 隔離組等, 重要的是instances,它用來指定分布的Verticle實例的數量,默認是一個。


底層調用DeploymentManager的doDeployVerticle來實現,它會根據實例數創建相應多的Verticle,然后調用doDeploy發布這些Verticle:


Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);


我將doDeploy方法簡化,讓我們看一下關鍵代碼:


private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,

                          ContextImpl parentContext,

                          ContextImpl callingContext,

                          Handler<AsyncResult<String>> completionHandler,

                          ClassLoader tccl, Verticle... verticles) {

        //准備工作

        ……

 

        for (Verticle verticle: verticles) {

            //創建上下文

            ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, conf, tccl) :

                    vertx.createEventLoopContext(deploymentID, conf, tccl);

 

            deployment.addVerticle(new VerticleHolder(verticle, context));

            context.runOnContext(v -> {

                try {

                    verticle.init(vertx, context);

                    Future<Void> startFuture = Future.future();

                    verticle.start(startFuture);

                    startFuture.setHandler(……);

                } catch (Throwable t) {}

            });

        }

    }


可以看到#11 行創建了一個上下文ContextImpl, 因為本例中我們不用worker模式,所以這個上下文是通過vertx.createEventLoopContext(deploymentID, conf, tccl)創建的。每個verticle都會創建一個新的上下文,因此verticle和上下文是意義對應的。


#17 行初始化verticle,#19 行啟動這個verticle。還記得我們的例子中實現的start方法嗎,它會在這里被調用。


這樣,多個verticle實例被發布了。


2 線程模型


首先插播一下Netty的線程模型,不感興趣的可以略過。


2.1 Netty的線程模型


雖然Vert.x底層籍由Netty實現,但是它的處理方式與Netty NIO的線程模型是不同的。


(以下談論的Netty線程模型是指NIO的情況)

比如下面的Netty代碼片段:


EventLoopGroup parentGroup = new NioEventLoopGroup(1);

EventLoopGroup childGroup = new NioEventLoopGroup(50);

try {

           ServerBootstrap b = new ServerBootstrap();

           b.group(parentGroup, childGroup)

                   .channel(NioServerSocketChannel.class)

                   .childHandler(new ChannelInitializer<SocketChannel>(){……});

           Channel ch = b.bind("0.0.0.0",8080).sync().channel();

           ch.closeFuture().sync();

       } finally {

           parentGroup.shutdownGracefully();

           childGroup.shutdownGracefully();

       }


NioEventLoopGroup代表一組EventLoop,每個EventLoop映射一個線程,每個Channel注冊一個EventLoop,但是一個EventLoop可以關聯多個Channel。

parentGroup用來處理Accept事件,而childGroup用來處理其余的IO事件。當有並發連接的時候,Handler會在childGroup線程池中執行。你可以指定childGroup的線程數量,如果沒有指定,則從系統屬性中讀取”io.netty.eventLoopThreads”,如果這個屬性沒有設置,則使用CPU核數 2 (Runtime.getRuntime().availableProcessors() 2))。一般parentGroup設置為1,我們只需要一個Acceptor處理客戶端的連接即可。


當有多個並發連接的時候,每個連接/Channel被分配到一個EventLoop上。EventLoop選擇是均勻地 (如果線程數是2的n次方,可以用比較快的選擇方法PowerOfTwoEventExecutorChooser):


private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {

    @Override

    public EventExecutor next() {

        return children[childIndex.getAndIncrement() & children.length - 1];

    }

}

private final class GenericEventExecutorChooser implements EventExecutorChooser {

    @Override

    public EventExecutor next() {

        return children[Math.abs(childIndex.getAndIncrement() % children.length)];

    }

}


因此一旦如果某個EventLoop處理慢了,則這個線程上的event可能出現堆積。

比如下面的代碼故意在某個線程上處理慢一些,導致這個EventLoop上出現堆積,Netty並沒有根據壓力將時間分配到其它處理快的EventLoop上。


public class HelloServerHandler extends ChannelInboundHandlerAdapter {

    ……

 

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        String name = Thread.currentThread().getName();

        System.out.println(name);

        if (name.endsWith("-5")) {

            try {

                Thread.sleep(1000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

        ……


輸出結果可以看到nioEventLoopGroup-3-5處理了同樣多的請求,而且都堆積在后面了。


……

nioEventLoopGroup-3-19

nioEventLoopGroup-3-18

nioEventLoopGroup-3-19

nioEventLoopGroup-3-18

nioEventLoopGroup-3-20

nioEventLoopGroup-3-20

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5


因此,我們可以了解到,當啟動一個NIO方式的Netty實例的時候,它會使用一個線程池來處理http請求。


Netty 4.0的線程模型被很好的重定義,一個ChannelHandler實例的方法不會被並發的調用,除非它被@Sharable標記,因此你不應該增加一個ChannelHandler 實例多次。當你增加一個handler到ChannelPipeline中時,你可以指定一個特定的EventExecutorGroup來執行這個handler。如果沒有指定,則使用Channel注冊的EventLoop來執行。如果兩個Handler被指定不同的EventExecutorGroup,則它們會並發執行,因此如果它們會訪問共享數據的化,你需要關注並發控制的問題。更多內容可以查看 Netty的文檔。


2.2 Vert.x的線程模型


Vert.x如何在線程中處理事件的呢,還是以我們的例子分析。


回顧一下我們實現的Verticle的start方法。


@Override

    public void start() {

        Handler<HttpServerRequest> handler = e -> {

            HttpServerResponse response = e.response();

            response.putHeader("content-type", "application/json").end("Hello world");

        };

        vertx.createHttpServer().requestHandler(handler).listen(8080);

    }


在這個start方法中,我們創建了一個http server,讓它監聽 8080端口, http request的處理交給handler執行。 那么監聽線程是哪一個?handler又是在哪個線程池中執行的呢?調用多個Verticle實例的方法為什么沒有出現”地址/端口被占用”的異常呢?


首先vertx.createHttpServer()會創建一個HttpServerImpl對象,可以通過HttpServerOptions配置更多的參數,每個Verticle實例都會創建一個HttpServerImpl對象。requestHandler(handler)方法設置處理器,你還可以使用Vert.x-Web設置路由的功能。


listen(8080)啟動http 服務器,它實際調用netty實現的。

我將listen方法簡化,去除一些檢查代碼和回調處理,只保留關鍵代碼如下:


public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) {        

        listenContext = vertx.getOrCreateContext();

        listening = true;       

        synchronized (vertx.sharedHttpServers()) {

            id = new ServerID(port, host);

            HttpServerImpl shared = vertx.sharedHttpServers().get(id);

            if (shared == null) {

                serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);

                ServerBootstrap bootstrap = new ServerBootstrap();

                bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers);

                bootstrap.channelFactory(new VertxNioServerChannelFactory());

 

                bootstrap.childHandler(new ChannelInitializer<Channel>() {

                    @Override

                    protected void initChannel(Channel ch) throws Exception {

                        ……

                        pipeline.addLast("handler", new ServerHandler());

                    }

                });

                addHandlers(this, listenContext);

 

                vertx.sharedHttpServers().put(id, this);

                actualServer = this;

            } else {

                // Server already exists with that host/port - we will use that

                actualServer = shared;

                addHandlers(actualServer, listenContext);

                metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options);

            }

 

        }

        return this;

    }


#6 行可以看到它會檢查使用這個IP地址和端口的http server是否存在,如果存在的化直接跳到# 27行。因此回答上面的問題,多個Verticle實例不會引起沖突,因為它們會共享同一個http server。


這個http server通過netty ServerBootstrap創建。#10 行可以看到acceptor是一個單線程執行的,acceptorEventLoopGroup在VertxImpl中定義。


acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);


#10 行還顯示,netty的IO worker線程池由availableWorkers確定,它是一個VertxEventLoopGroup對象。VertxEventLoopGroup類擴展AbstractEventExecutorGroup,實現了EventLoopGroup接口:


……

@Override

public synchronized EventLoop next() {

  if (workers.isEmpty()) {

    throw new IllegalStateException();

  } else {

    EventLoop worker = workers.get(pos).worker;

    pos++;

    checkPos();

    return worker;

  }

}

public synchronized void addWorker(EventLoop worker) {

  EventLoopHolder holder = findHolder(worker);

  if (holder == null) {

    workers.add(new EventLoopHolder(worker));

  } else {

    holder.count++;

  }

……


線程的數量由worker的數量決定,worker的類型是EventLoop,對應一個線程,有多少worker就會有多少線程。


通過addWorker可以增加線程的數量,worker不會重復。


回到剛才的listen方法, #21 行addHandlers方法會配置handler在哪一個event loop中執行:


private void addHandlers(HttpServerImpl server, ContextImpl context) {

  if (requestStream.handler() != null) {

    server.reqHandlerManager.addHandler(requestStream.handler(), context);

  }

  if (wsStream.handler() != null) {

    server.wsHandlerManager.addHandler(wsStream.handler(), context);

  }

}


server.reqHandlerManager.addHandler方法如下:


public synchronized void addHandler(Handler<T> handler, ContextImpl context) {

  EventLoop worker = context.nettyEventLoop();

  availableWorkers.addWorker(worker);

  Handlers<T> handlers = new Handlers<>();

  Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers);

  if (prev != null) {

    handlers = prev;

  }

  handlers.addHandler(new HandlerHolder<>(context, handler));

  hasHandlers = true;

}


#2 行得到這個上下文的EventLoop。 還記得上下文的EventLoop怎么創建出來的嗎?每個Verticle實例關聯一個上下文,因此一個Verticle實例只會創建一個worker。


把這個worker加入到availableWorkers,這樣就增加了一個事件處理線程。


因此我們可以看出正常情況下Vert.x的每個Verticle實例只會用一個線程處理請求,在多核情況下一定要配置instance的數量。


如果配置的instance的數量大於eventLoopPoolSize數量,那么就會有一個Event Loop處理多個instance的情況。 線程配置的過多有時不會帶來性能的提升,由於線程也有context swicthing,反而會帶來性能的降低。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM