Java網關服務-AIO(三)


Java網關服務-AIO(三)

概述

前兩節中,我們已經獲取了body的總長度,剩下的就是讀出body,處理請求

ChannelServerHandler

ChannelServerHandler即從channel中讀取請求,也向channle輸出結果,因此它實現了InboundHandler, OutboundHandler

/**
 * 讀取請求的內容,業務處理
 */
public class ChannelServerHandler implements CompletionHandler<Integer, ByteBuffer>, InboundHandler, OutboundHandler {

	private final static Logger LOGGER = LoggerFactory.getLogger(ChannelServerHandler.class);

	private AsynchronousSocketChannel channel;

	public ChannelServerHandler(AsynchronousSocketChannel channel) {
		this.channel = channel;
	}

	public void completed(Integer result, ByteBuffer attachment) {
		//如果條件成立,說明客戶端主動終止了TCP套接字,這時服務端終止就可以了
		if (result == -1) {
			System.out.println("remote is close");
			closeChannel();
			return;
		}

		Object resultData;
		String req = (String) read(channel, attachment);
		if (req == null) {
			closeChannel();
			return;
		}

		try {
			LOGGER.info("socket:{}", channel.getRemoteAddress());

			//同步處理請求
			RequestHandler requestHandler = ApplicationUtils.getBean(RequestHandler.class);
			resultData = requestHandler.execute(req);

		} catch (Throwable t) {
			resultData = Result.error("ERROR", Utils.error(t));
			LOGGER.error("調用接口失敗", t);
		}

		if (resultData == null) {
			resultData = Result.failure("FAILURE", "調用失敗,數據為空.");
		}
		try {
			String resultContent =  resultData instanceOf String ? (String) resultData : JSON.toJSONString(resultData);
			byte[] bytes = resultContent.getBytes("UTF-8");
			ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
			writeBuffer.put(bytes);
			writeBuffer.flip();

			write(channel, writeBuffer);
		} catch (Exception e) {
			LOGGER.error("對象轉JSON失敗,對象:{}", resultData, e);
		}

		closeChannel();
	}

	@Override
	public Object read(AsynchronousSocketChannel socketChannel, ByteBuffer in) {
		in.flip();
		byte[] body = new byte[in.remaining()];
		in.get(body);

		String req = null;
		try {
			req = new String(body, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		return req;
	}

	@Override
	public Object write(AsynchronousSocketChannel socketChannel, ByteBuffer out) {
		//write,write操作結束后關閉通道
		channel.write(out, out, new CompletionHandler<Integer, ByteBuffer>() {
			@Override
			public void completed(Integer result, ByteBuffer attachment) {
				closeChannel();
			}

			@Override
			public void failed(Throwable exc, ByteBuffer attachment) {
				closeChannel();
			}
		});
		return null;
	}

	public void failed(Throwable exc, ByteBuffer attachment) {
		closeChannel();
	}

	private void closeChannel() {
		try {
			this.channel.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

}

讀取body

		in.flip();
		byte[] body = new byte[in.remaining()];
		in.get(body);

		String req = null;
		try {
			req = new String(body, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		return req;	
in.remaining()

buffer中含有的字節數

客戶端、服務端由於跨語言和經驗問題,沒有使用復雜的跨語言序列化技術,雙方約定使用UTF-8編碼,通過將body轉換為String,最終獲得了客戶端傳遞的字符串。

處理請求

經過自定義的請求處理邏輯,同步處理,最終將響應編碼后,發送給客戶端,write操作結束后,關閉連接

總結

使用AIO開發服務端時,主要涉及

  • 配置I/O事件完成的回調線程池
  • 從accept -> read 到 向client端響應 write -> close,盡量使用CompletionHanlder來異步處理,不要在處理某個事件完成的線程中,同步的調用,如future.get()
  • 如果是短連接,則需在write操作時注冊write結束后的handler,在handler中關閉連接

擴展

長連接該如何處理

  • 長連接意味着client可以發多次請求,由於多次請求被server執行的順序是不可控的,可能后發的請求先響應,因此需要在請求和響應時,加requestId,據此對應到請求的結果
  • 長連接不需要在write后關閉連接
  • 長連接需要開發定時的ping-pong心跳消息
  • 長連接在響應時比現在更復雜,也需要一個和請求類似或相同的協議來標識body長度

測試

測試用例

	/**
	 * mvn -Dtest=com.jd.jshop.web.sdk.test.ClientTest#pingReqSocket test
	 *
	 * @throws IOException
	 */
	@Test
	@PerfTest(invocations = 20000, threads = 50)
	public void pingReqSocket() throws IOException {
		
		byte[] content = "ping".getBytes("UTF-8");
		String result = sendReq(content);

		//斷言 是否和預期一致
		Assert.assertEquals("pong", result);
	}

	private String sendReq(byte[] content) throws IOException {
		ByteBuffer writeBuffer = ByteBuffer.allocate(4 + content.length);
		writeBuffer.putInt(content.length);
		writeBuffer.put(content);
		writeBuffer.flip();


		Socket socket = new Socket();
		socket.connect(new InetSocketAddress("127.0.0.1", 9801));
		socket.getOutputStream().write(writeBuffer.array());
		socket.getOutputStream().flush();
		byte[] buf = new byte[1024];
		int len = 0;
		String result = null;
		while ((len = socket.getInputStream().read(buf)) != -1) {
			result = new String(buf, 0, len);
			System.out.println(result);
		}
		return result;
	}

測試的方法是,在服務器上建立socket連接,向server發送ping,server返回pong
測試服務器:centos, 2個物理核,4個邏輯核,內存16G

分析aio的實現:
在ping-pong測試中性能極高,優於並甩開netty

以下是使用Netty開發的server端的測試用例,可以和上面的圖片對比一下

Measured invocations:	10,000	
Thread Count:	10	
		 
	Measured
(system)	Required
Execution time:	1,646 ms	
Throughput:	6,075 / s	
Min. latency:	0 ms	
Average latency:	1 ms	
Median:	2 ms	
90%:	2 ms	
Max latency:	26 ms	

============================


Started at:	Oct 16, 2018 5:27:03 PM
Measured invocations:	20,000	
Thread Count:	20	
		 
	Measured
(system)	Required
Execution time:	3,293 ms	
Throughput:	6,073 / s	
Min. latency:	0 ms	
Average latency:	3 ms	
Median:	3 ms	
90%:	5 ms	
Max latency:	54 ms	

============================

Started at:	Oct 16, 2018 5:28:24 PM
Measured invocations:	20,000	
Thread Count:	10	
		 
	Measured
(system)	Required
Execution time:	3,051 ms	
Throughput:	6,555 / s	
Min. latency:	0 ms	
Average latency:	1 ms	
Median:	1 ms	
90%:	2 ms	
Max latency:	44 ms	

============================

Started at:	Oct 16, 2018 5:30:06 PM
Measured invocations:	20,000	
Thread Count:	50	
		 
	Measured
(system)	Required
Execution time:	3,167 ms	
Throughput:	6,315 / s	
Min. latency:	0 ms	
Average latency:	7 ms	
Median:	7 ms	
90%:	10 ms	
Max latency:	64 ms	

分析基於Netty的實現:
吞吐量:6000+/s
10個線程時:90%低於2ms,平均1ms
20個線程時:90%低於5ms,平均3ms
50個線程時:90%低於10ms,平均7ms

線程越多,性能越低

當前測試用例不太依賴內存
執行10000+次請求,建立10000+連接,要求服務器對單個進程fd限制打開,防止報too many open files導致測試用例執行失敗

    ulimit -n 20240


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM