配置broker時,都會設置connector,connector內部會持有一個TransportServer,TransportServer相當於socketserver,transport相當於socket,connector啟動(start)時會引發socketserver的啟動(start),TcpTransportServer的主要職責就是accept(socketserver的主要職責就是accept)
1 //org.apache.activemq.transport.tcp.TcpTransportServer的start 2 protected void doStart() throws Exception { 3 if (useQueueForAccept) { 4 Runnable run = new Runnable() { 5 @Override 6 public void run() { 7 try { 8 //socketQueue是一個阻塞隊列,不斷的從socketQueue中獲得socket,然后執行handleSocket方法 9 while (!isStopped() && !isStopping()) { 10 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 11 if (sock != null) { 12 try { 13 //handleSocket最終會調用getAcceptListener().onAccept(configuredTransport) 14 handleSocket(sock); 15 } catch (Throwable thrown) { 16 if (!isStopping()) { 17 onAcceptError(new Exception(thrown)); 18 } else if (!isStopped()) { 19 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 20 onAcceptError(new Exception(thrown)); 21 } 22 } 23 } 24 } 25 26 } catch (InterruptedException e) { 27 if (!isStopped() || !isStopping()) { 28 LOG.info("socketQueue interrupted - stopping"); 29 onAcceptError(e); 30 } 31 } 32 } 33 }; 34 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 35 socketHandlerThread.setDaemon(true); 36 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 37 //啟動線程,該線程循環處理accept到的socket 38 socketHandlerThread.start(); 39 } 40 //TcpTransportServer繼承了TransportServerThreadSupport,其本身也持有持有一個線程, 41 //該線程用來執行的run邏輯,TcpTransportServer的run邏輯就是accept。 42 super.doStart(); 43 }
TcpTransportServer啟動后執行的run方法
1 /** 2 * 就是accept 3 * pull Sockets from the ServerSocket 4 */ 5 @Override 6 public void run() { 7 if (!isStopped() && !isStopping()) { 8 final ServerSocket serverSocket = this.serverSocket; 9 if (serverSocket == null) { 10 onAcceptError(new IOException("Server started without a valid ServerSocket")); 11 } 12 //channel不為null說明是nio 13 final ServerSocketChannel channel = serverSocket.getChannel(); 14 if (channel != null) { 15 //nio 16 doRunWithServerSocketChannel(channel); 17 } else { 18 //以bio為例 19 doRunWithServerSocket(serverSocket); 20 } 21 } 22 }
TcpTransportServer的run方法就是accept
1 private void doRunWithServerSocket(final ServerSocket serverSocket) { 2 while (!isStopped()) { 3 //accept 4 Socket socket = serverSocket.accept(); 5 if (socket != null) { 6 if (isStopped() || getAcceptListener() == null) { 7 socket.close(); 8 } else { 9 //默認useQueueForAccept為true,socketQueue中的Socket最終會由socketHandlerThread取出並執行handleSocket(socket), 10 //handleSocket最終會調用getAcceptListener().onAccept(configuredTransport)。 11 if (useQueueForAccept) { 12 socketQueue.put(socket); 13 } else { 14 //handleSocket最終會調用getAcceptListener().onAccept(configuredTransport) 15 handleSocket(socket); 16 } 17 } 18 } 19 } 20 }
在org.apache.activemq.broker.TransportConnector啟動時,設置了AcceptListener(getServer().setAcceptListener)該AcceptListener的主要邏輯:
1 brokerService.getTaskRunnerFactory().execute(new Runnable() { 2 @Override 3 public void run() { 4 try { 5 if (!brokerService.isStopping()) { 6 //transport中含有Socket,transport代表client 7 Connection connection = createConnection(transport); 8 //connection.start會執行transport.start,TcpTransport繼承了TransportServerThreadSupport, 9 //start會開啟該線程的執行邏輯,即TcpTransport的run方法,阻塞讀取socket數據。 10 connection.start(); 11 } else { 12 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 13 } 14 } catch (Exception e) { 15 String remoteHost = transport.getRemoteAddress(); 16 ServiceSupport.dispose(transport); 17 onAcceptError(e, remoteHost); 18 } 19 } 20 });
accept到的socket被包裝為TcpTransport,並start該Transport,socket就可以開始讀數據了
1 //TcpTransport的run方法會調用doRun 2 protected void doRun() throws IOException { 3 try { 4 //從Socket讀數據 5 Object command = readCommand(); 6 doConsume(command);//執行命令,比如如果該command是一個Message,則會將其放入對應的destination 7 } catch (SocketTimeoutException e) { 8 } catch (InterruptedIOException e) { 9 } 10 }
小結:connector.start引發了TransportServer的start,TransportServer的start主要就是socketserver.accept(在獨立線程中),accept到的socket可以放入隊列異步處理,也可以同步處理(在accept所在線程中),二者的執行邏輯是一致的,都會將socket封裝成transport,再將transport封裝成connection后進行connection.start,這就開始阻塞讀了(transport會持有一個獨立線程,阻塞讀就在該線程中)。
TcpTransport的讀線程會阻塞在readCommand上,當client有數據傳來時,readCommand會讀取數據,並將其unmarshal為activemq的命令對象,然后再doConsume該對象。doConsume最終會調用broker(TransportConnection持有broker)的相關方法(比如,如果command是Message,就會調用broker.send(producerExchange, messageSend);),這就將Message交由broker處理了(接收client發來的命令並不屬於broker的職責,broker真正要做的是將處理這些命令,比如將消息路由置對應的destination,而接收client命令的任務是由TransportServer完成的)。