一,HTTP解碼器可能會將一個HTTP請求解析成多個消息對象。
ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ParseRequestHandler());
經過HttpServerCodec解碼之后,一個HTTP請求會導致:ParseRequestHandler的 channelRead()方法調用多次(測試時 "received message"輸出了兩次)
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("received message");
可以用HttpObjectAggregator 將多個消息轉換為單一的一個FullHttpRequest,如下:
ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536)); ch.pipeline().addLast(new ParseRequestHandler());
此時,一個HTTP消息(Object msg)是下面這樣的。
HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 17, cap: 17, components=1))
POST / HTTP/1.1
Host: 127.0.0.1:8888
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: null
Accept-Encoding: gzip, deflate
Content-Type: application/x-www-form-urlencoded
Content-Length: 17
Cookie: _ga=GA1.1.457486782.1446782739
Connection: keep-alive
從上面可以看出,實體首部字段Content-Length是17,表明實體主體有17個字節。
而我發送的消息是這樣的:
HTTP POST 請求,請求體是JSON格式的數據。這里使用的是json-lib解析的 Json字符串。代碼如下:
//parse job type 0,1 private String getJobType(FullHttpRequest request) throws IOException{ ByteBuf jsonBuf = request.content(); String jsonStr = jsonBuf.toString(CharsetUtil.UTF_8); JSONObject jsonObj = JSONObject.fromObject(jsonStr); String jobType = jsonObj.getString("jobType"); return jobType; }
需要注意是:使用json-lib解析Json字符串時,需要其他的依賴包如下:
解析完成之后,需要把處理后的結果發送到下一個ChannelHandler,進行下一步的處理。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //do some process ..... ctx.fireChannelRead(job); }
注意,使用的是fireChannelRead()方法,而不是 ctx.writeAndFlush(...)。因為,writeAndFlush/write 是Outbound,它是把消息發送到上一個Handler,進而發送到remote peer,而這里是InBound。具體參考:
這里,通過 ctx.fireChannelRead(job); 將處理后的結果發送到下一個Channel處理。
ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(2048)); ch.pipeline().addLast(new ParseRequestHandler()); ch.pipeline().addLast(new OozieRequestHandler());
下一個Handler是OozieRequestHandler,它負責向Oozie Server提交作業,之后返回jobId給客戶端(HttpServerCodec Handler 負責底層傳輸細節)。
Netty構造一個http 響應的方法如下:
String jobId = doPost(jobConfig); FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(jobId.getBytes())); response.headers().set(CONTENT_TYPE, "application/xml"); response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes()); ctx.write(response).addListener(ChannelFutureListener.CLOSE);
整個完整代碼可參考:https://github.com/hapjin/netty_schedule