activemq-broker接收消息



配置broker時,都會設置connector,connector內部會持有一個TransportServer,TransportServer相當於socketserver,transport相當於socket,connector啟動(start)時會引發socketserver的啟動(start),TcpTransportServer的主要職責就是accept(socketserver的主要職責就是accept)

 1 //org.apache.activemq.transport.tcp.TcpTransportServer的start
 2 protected void doStart() throws Exception {
 3   if (useQueueForAccept) {
 4     Runnable run = new Runnable() {
 5       @Override
 6       public void run() {
 7         try {
 8             //socketQueue是一個阻塞隊列,不斷的從socketQueue中獲得socket,然后執行handleSocket方法
 9           while (!isStopped() && !isStopping()) {
10             Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
11             if (sock != null) {
12               try {
13                   //handleSocket最終會調用getAcceptListener().onAccept(configuredTransport)
14  handleSocket(sock);
15               } catch (Throwable thrown) {
16                 if (!isStopping()) {
17                   onAcceptError(new Exception(thrown));
18                 } else if (!isStopped()) {
19                   LOG.warn("Unexpected error thrown during accept handling: ", thrown);
20                   onAcceptError(new Exception(thrown));
21                 }
22               }
23             }
24           }
25 
26         } catch (InterruptedException e) {
27           if (!isStopped() || !isStopping()) {
28             LOG.info("socketQueue interrupted - stopping");
29             onAcceptError(e);
30           }
31         }
32       }
33     };
34     socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
35     socketHandlerThread.setDaemon(true);
36     socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
37     //啟動線程,該線程循環處理accept到的socket
38  socketHandlerThread.start();
39   }
40   //TcpTransportServer繼承了TransportServerThreadSupport,其本身也持有持有一個線程,
41   //該線程用來執行的run邏輯,TcpTransportServer的run邏輯就是accept。
42   super.doStart();
43 }

 

TcpTransportServer啟動后執行的run方法

 1 /**
 2 * 就是accept
 3 * pull Sockets from the ServerSocket
 4 */
 5 @Override
 6 public void run() {
 7   if (!isStopped() && !isStopping()) {
 8     final ServerSocket serverSocket = this.serverSocket;
 9     if (serverSocket == null) {
10       onAcceptError(new IOException("Server started without a valid ServerSocket"));
11     }
12         //channel不為null說明是nio
13     final ServerSocketChannel channel = serverSocket.getChannel();
14     if (channel != null) {
15         //nio
16       doRunWithServerSocketChannel(channel);
17     } else {
18         //以bio為例
19  doRunWithServerSocket(serverSocket);
20     }
21   }
22 }

TcpTransportServer的run方法就是accept

 1 private void doRunWithServerSocket(final ServerSocket serverSocket) {
 2   while (!isStopped()) {
 3       //accept
 4     Socket socket = serverSocket.accept();
 5     if (socket != null) {
 6       if (isStopped() || getAcceptListener() == null) {
 7         socket.close();
 8       } else {
 9           //默認useQueueForAccept為true,socketQueue中的Socket最終會由socketHandlerThread取出並執行handleSocket(socket),
10           //handleSocket最終會調用getAcceptListener().onAccept(configuredTransport)。
11         if (useQueueForAccept) {
12  socketQueue.put(socket);
13         } else {
14             //handleSocket最終會調用getAcceptListener().onAccept(configuredTransport)
15  handleSocket(socket);
16         }
17       }
18     }
19   }
20 }

 

在org.apache.activemq.broker.TransportConnector啟動時,設置了AcceptListener(getServer().setAcceptListener)該AcceptListener的主要邏輯:

 1 brokerService.getTaskRunnerFactory().execute(new Runnable() {
 2   @Override
 3   public void run() {
 4     try {
 5       if (!brokerService.isStopping()) {
 6           //transport中含有Socket,transport代表client
 7         Connection connection = createConnection(transport);
 8         //connection.start會執行transport.start,TcpTransport繼承了TransportServerThreadSupport,
 9         //start會開啟該線程的執行邏輯,即TcpTransport的run方法,阻塞讀取socket數據。
10  connection.start();
11       } else {
12         throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
13       }
14     } catch (Exception e) {
15       String remoteHost = transport.getRemoteAddress();
16       ServiceSupport.dispose(transport);
17       onAcceptError(e, remoteHost);
18     }
19   }
20 });

accept到的socket被包裝為TcpTransport,並start該Transport,socket就可以開始讀數據了

 1 //TcpTransport的run方法會調用doRun
 2 protected void doRun() throws IOException {
 3   try {
 4       //從Socket讀數據
 5     Object command = readCommand();
 6  doConsume(command);//執行命令,比如如果該command是一個Message,則會將其放入對應的destination  7   } catch (SocketTimeoutException e) {
 8   } catch (InterruptedIOException e) {
 9   }
10 }

 

小結:connector.start引發了TransportServer的start,TransportServer的start主要就是socketserver.accept(在獨立線程中),accept到的socket可以放入隊列異步處理,也可以同步處理(在accept所在線程中),二者的執行邏輯是一致的,都會將socket封裝成transport,再將transport封裝成connection后進行connection.start,這就開始阻塞讀了(transport會持有一個獨立線程,阻塞讀就在該線程中)。

 

TcpTransport的讀線程會阻塞在readCommand上,當client有數據傳來時,readCommand會讀取數據,並將其unmarshal為activemq的命令對象,然后再doConsume該對象。doConsume最終會調用broker(TransportConnection持有broker)的相關方法(比如,如果command是Message,就會調用broker.send(producerExchange, messageSend);),這就將Message交由broker處理了(接收client發來的命令並不屬於broker的職責,broker真正要做的是將處理這些命令,比如將消息路由置對應的destination,而接收client命令的任務是由TransportServer完成的)。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM