Netty HTTP+XML協議棧開發
由於HTTP協議的通用性,很多異構系統間的通信交互采用HTTP協議,通過HTTP協議承載業務數據進行消息交互,例如非常流行的HTTP+XML或者RESTful+JSON。
場景設計
模擬一個簡單的用戶訂購系統。客戶端填寫訂單,通過HTTP客戶端向服務端發送訂購請求,請求消息放在HTTP消息體中,以XML承載,即采用HTTP+XML的方式進行通信。HTTP服務端接收到訂購請求后,對訂單請求進行修改,然后通過HTTP+XML的方式返回應答消息。雙方采用HTTP1.1協議,連接類型為CLOSE方式,即雙方交互完成,由HTTP服務端主動關閉鏈路,隨后客戶端也關閉鏈路並退出。
訂購請求消息定義如表:
客戶信息定義如表:
地址信息定義如表:
郵遞方式定義如表:
HTTP+XML協議棧設計
首先對訂購流程圖進行分析,先看步驟1,構造訂購請求消息並將其編碼為HTTP+XML形式。Netty的HTTP協議棧提供了構造HTTP請求消息的相關接口,但是無法將普通的POJO對象轉換為HTTP+XML的HTTP請求消息,需要自定義HTTP+XML格式的請求消息編碼器。
再看步驟2,利用Netty的HTTP協議棧,可以支持HTTP鏈路的建立和請求消息的發送,所以不需要額外開發,直接重用Netty的能力即可。
步驟3,HTTP服務端需要將HTTP+XML格式的訂購請求消息解碼為訂購請求POJO對象,同時獲取HTTP請求消息頭信息。利用Netty的HTTP協議棧服務端,可以完成HTTP請求消息的解碼,但是,如果消息體為XML格式,Netty無法支持將其解碼為POJO對象,需要在Netty協議棧的基礎上擴展實現。
步驟4,服務端對訂購請求消息處理完成后,重新將其封裝成XML,通過HTTP應答消息體攜帶給客戶端,Netty的HTTP協議棧不支持直接將POJO對象的應答消息以XML方式發送,需要定制。
步驟5,HTTP客戶端需要將HTTP+XML格式的應答消息解碼為訂購POJO對象,同時能夠獲取應答消息的HTTP頭信息,Netty的協議棧不支持自動的消息解碼。
通過分析,我們可以了解到哪些能力是Netty支持的,哪些需要擴展開發實現。下面給出設計思路。
(1)需要一套通用、高性能的XML序列化框架,它能夠靈活地實現POJO-XML的互相轉換,最好能夠通過工具自動生成綁定關系,或者通過XML的方式配置雙方的映射關系;
(2)作為通用的HTTP+XML協議棧,XML-POJO對象的映射關系應該非常靈活,支持命名空間和自定義標簽;
(3)提供HTTP+XML請求消息編碼器,供HTTP客戶端發送請求消息自動編碼使用;
(4)提供HTTP+XML請求消息解碼器,供HTTP服務端對請求消息自動解碼使用;
(5)提供HTTP+XML響應消息編碼器,供HTTP服務端發送響應消息自動編碼使用;
(6)提供HTTP+XML響應消息編碼器,供HTTP客戶端對應答消息進行自動解碼使用;
(7)協議棧使用者不需要關心HTTP+XML的編解碼,對上層業務零侵入,業務只需要對上層的業務POJO對象進行編排。
高效的XML綁定框架JiBx
使用JiBX綁定XML文檔與Java對象需要分兩步走:第一步是綁定XML文件,也就是映射XML文件與Java對象之間的對應關系;第二步是在運行時,實現XML文件與Java實例之間的互相轉換。這時,它已經與綁定文件無關了,可以說是完全脫耦了。
在運行程序之前,需要先配置綁定文件並進行綁定,在綁定過程中它將會動態地修改程序中相應的class文件,主要是生成對應對象實例的方法和添加被綁定標記的屬性JiBX_bindingList等。它使用的技術是BCEL(Byte Code Engineering Library),BCEL是Apache Software Foundation的Jakarta項目的一部分,也是目前Java classworking最廣泛使用的一種框架,它可以讓你深入JVM匯編語言進行類操作。在JiBX運行時,它使用了目前比較流行的一個技術XPP(Xml Pull Parsing),這也正是JiBX如此高效的原因。
JiBx有兩個比較重要的概念:Unmarshal(數據分解)和Marshal(數據編排)。從字面意思也很容易理解,Unmarshal是將XML文件轉換成Java對象,而Marshal則是將Java對象編排成規范的XML文件。JiBX在Unmarshal/Marshal上如此高效,這要歸功於使用了XPP技術,而不是使用基於樹型(tree-based)方式,將整個文檔寫入內存,然后進行操作的DOM(Document Object Model),也不是使用基於事件流(event stream)的SAX(Simple API for Xml)。XPP使用的是不斷增加的數據流處理方式,同時允許在解析XML文件時中斷。
POJO對象定義完成之后,通過Ant腳本來生成XML和POJO對象的綁定關系文件,同時也附加生成XML的Schema定義文件。
JiBx的綁定和編譯,通過JiBx的org.jibx.binding.generator.BindGen工具類可以將指定的POJO對象Order類生成綁定文件和Schema定義文件。
JiBx的編譯命令,它的作用是根據綁定文件和POJO對象的映射關系和規則動態修改POJO類
代碼示例:
import lombok.Data; @Data public class Address { /** * First line of street information (required). */ private String street1; /** * Second line of street information (optional). */ private String street2; private String city; /** * State abbreviation (required for the U.S. and Canada, optional * otherwise). */ private String state; /** * Postal code(required for the U.S.and Canada,optional otherwise). */ private String postCode; /** * Country name (optional, U.S. assumed if not supplied). */ private String country; } import lombok.Data; import java.util.List; @Data public class Customer { private long customerNumber; /** * Personal name. */ private String firstName; /** * Family name. */ private String lastName; /** * Middle name(s), if any. */ private List middleNames; } import lombok.Data; @Data public class Order { private long orderNumber; private Customer customer; private Address billTo; private Shipping shipping; private Address shipTo; private Float total; } public enum Shipping { STANDARD_MAIL, PRIORITY_MAIL, INTERNATIONAL_MAIL, DOMESTIC_EXPRESS, INTERNATIONAL_EXPRESS } import org.jibx.runtime.*; import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; import java.util.Arrays; public class TestOrder { private IBindingFactory factory = null; private StringWriter writer = null; private StringReader reader = null; private final static String CHARSET_NAME = "UTF-8"; private String encode2Xml(Order order) throws JiBXException, IOException { factory = BindingDirectory.getFactory(Order.class); writer = new StringWriter(); IMarshallingContext mctx = factory.createMarshallingContext(); mctx.setIndent(2); mctx.marshalDocument(order, CHARSET_NAME, null, writer); String xmlStr = writer.toString(); writer.close(); System.out.println(xmlStr.toString()); return xmlStr; } private Order decode2Order(String xmlBody) throws JiBXException { reader = new StringReader(xmlBody); IUnmarshallingContext uctx = factory.createUnmarshallingContext(); Order order = (Order) uctx.unmarshalDocument(reader); return order; } public static void main(String[] args) throws JiBXException, IOException { TestOrder test = new TestOrder(); Order order = new Order(); order.setOrderNumber(123); Customer customer = new Customer(); customer.setFirstName("ali"); customer.setMiddleNames(Arrays.asList("baba")); customer.setLastName("taobao"); order.setCustomer(customer); Address address = new Address(); address.setCity("南京市"); address.setCountry("中國"); address.setPostCode("123321"); address.setState("江蘇省"); address.setStreet1("龍眠大道"); address.setStreet2("INTERNATIONAL_MAIL"); order.setBillTo(address); order.setShipTo(address); order.setShipping(Shipping.INTERNATIONAL_MAIL); order.setTotal(33f); String body = test.encode2Xml(order); Order order2 = test.decode2Order(body); System.out.println(order2); } } <dependency> <groupId>org.jibx</groupId> <artifactId>jibx-bind</artifactId> <version>1.3.0</version> </dependency>
基礎封裝類示例
import io.netty.handler.codec.http.FullHttpRequest; import lombok.Data; @Data public class HttpXmlRequest { //它包含兩個成員變量FullHttpRequest和編碼對象Object,用於實現和協議棧之間的解耦。 private FullHttpRequest request; private Object body; public HttpXmlRequest(FullHttpRequest request, Object body) { this.body = body; this.request = request; } } import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.CharsetUtil; import java.util.List; import static io.netty.handler.codec.http.HttpResponseStatus.*; import static io.netty.handler.codec.http.HttpHeaders.Names.*; import static io.netty.handler.codec.http.HttpVersion.*; public class HttpXmlRequestDecoder extends AbstractHttpXmlDecoder { public HttpXmlRequestDecoder(Class clazz) { this(clazz, false); } //HttpXmlRequestDecoder有兩個參數,分別為需要解碼的對象的類型信息和是否打印HTTP消息體碼流的碼流開關,碼流開關默認關閉。 public HttpXmlRequestDecoder(Class clazz, boolean isPrint) { super(clazz, isPrint); } @Override protected void decode(ChannelHandlerContext arg0, Object o, List arg2) throws Exception { FullHttpRequest arg1 = (FullHttpRequest)o; //首先對HTTP請求消息本身的解碼結果進行判斷,如果已經解碼失敗,再對消息體進行二次解碼已經沒有意義。 if (!arg1.getDecoderResult().isSuccess()) { //如果HTTP消息本身解碼失敗,則構造處理結果異常的HTTP應答消息返回給客戶端。 sendError(arg0, BAD_REQUEST); return; } //通過HttpXmlRequest和反序列化后的Order對象構造HttpXmlRequest實例,最后將它添加到解碼結果List列表中。 HttpXmlRequest request = new HttpXmlRequest(arg1, decode0(arg0,arg1.content())); arg2.add(request); } private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } } import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.*; import java.net.InetAddress; import java.util.List; public class HttpXmlRequestEncoder extends AbstractHttpXmlEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object o,List out) throws Exception { HttpXmlRequest msg = (HttpXmlRequest)o; //首先調用父類的encode0,將業務需要發送的POJO對象Order實例通過JiBx序列化為XML字符串 //隨后將它封裝成Netty的ByteBuf。 ByteBuf body = encode0(ctx, msg.getBody()); FullHttpRequest request = msg.getRequest(); //對消息頭進行判斷,如果業務自定義和定制了消息頭,則使用業務側設置的HTTP消息頭, //如果業務側沒有設置,則構造新的HTTP消息頭。 if (request == null) { //用來構造和設置默認的HTTP消息頭,由於通常情況下HTTP通信雙方更關注消息體本身,所以這里采用了硬編碼的方式, //如果要產品化,可以做成XML配置文件,允許業務自定義配置,以提升定制的靈活性。 request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,HttpMethod.GET, "/do", body); HttpHeaders headers = request.headers(); headers.set(HttpHeaders.Names.HOST, InetAddress.getLocalHost().getHostAddress()); headers.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); headers.set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP.toString() + ',' + HttpHeaders.Values.DEFLATE.toString()); headers.set(HttpHeaders.Names.ACCEPT_CHARSET,"ISO-8859-1,utf-8;q=0.7,*;q=0.7"); headers.set(HttpHeaders.Names.ACCEPT_LANGUAGE, "zh"); headers.set(HttpHeaders.Names.USER_AGENT,"Netty xml Http Client side"); headers.set(HttpHeaders.Names.ACCEPT,"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"); } //由於請求消息消息體不為空,也沒有使用Chunk方式,所以在HTTP消息頭中設置消息體的長度Content-Length, //完成消息體的XML序列化后將重新構造的HTTP請求消息加入到out中, //由后續Netty的HTTP請求編碼器繼續對HTTP請求消息進行編碼。 HttpHeaders.setContentLength(request, body.readableBytes()); out.add(request); } } import io.netty.handler.codec.http.FullHttpResponse; //它包含兩個成員變量:FullHttpResponse和Object,Object就是業務需要發送的應答POJO對象。 public class HttpXmlResponse { private FullHttpResponse httpResponse; private Object result; public HttpXmlResponse(FullHttpResponse httpResponse, Object result) { this.httpResponse = httpResponse; this.result = result; } public final FullHttpResponse getHttpResponse() { return httpResponse; } public final void setHttpResponse(FullHttpResponse httpResponse) { this.httpResponse = httpResponse; } public final Object getResult() { return result; } public final void setResult(Object result) { this.result = result; } } import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; import java.util.List; public class HttpXmlResponseDecoder extends AbstractHttpXmlDecoder { public HttpXmlResponseDecoder(Class clazz) { this(clazz, false); } public HttpXmlResponseDecoder(Class clazz, boolean isPrintlog) { super(clazz, isPrintlog); } @Override protected void decode(ChannelHandlerContext ctx,Object o, List out) throws Exception { DefaultFullHttpResponse msg = (DefaultFullHttpResponse)o; HttpXmlResponse resHttpXmlResponse = new HttpXmlResponse(msg, decode0( ctx, msg.content())); out.add(resHttpXmlResponse); } } import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import java.util.List; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static io.netty.handler.codec.http.HttpHeaders.setContentLength; import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; public class HttpXmlResponseEncoder extends AbstractHttpXmlEncoder { protected void encode(ChannelHandlerContext ctx, Object o, List out) throws Exception { HttpXmlResponse msg = (HttpXmlResponse) o; ByteBuf body = encode0(ctx, msg.getResult()); FullHttpResponse response = msg.getHttpResponse(); if (response == null) { response = new DefaultFullHttpResponse(HTTP_1_1, OK, body); } else { response = new DefaultFullHttpResponse(msg.getHttpResponse() .getProtocolVersion(), msg.getHttpResponse().getStatus(), body); } response.headers().set(CONTENT_TYPE, "text/xml"); setContentLength(response, body.readableBytes()); out.add(response); } } import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import org.jibx.runtime.BindingDirectory; import org.jibx.runtime.IBindingFactory; import org.jibx.runtime.IUnmarshallingContext; import java.io.StringReader; import java.nio.charset.Charset; public abstract class AbstractHttpXmlDecoder extends MessageToMessageDecoder { private IBindingFactory factory; private StringReader reader; private Class clazz; private boolean isPrint; private final static String CHARSET_NAME = "UTF-8"; private final static Charset UTF_8 = Charset.forName(CHARSET_NAME); protected AbstractHttpXmlDecoder(Class clazz) { this(clazz, false); } protected AbstractHttpXmlDecoder(Class clazz, boolean isPrint) { this.clazz = clazz; this.isPrint = isPrint; } protected Object decode0(ChannelHandlerContext arg0, ByteBuf body) throws Exception { //從HTTP的消息體中獲取請求碼流,然后通過JiBx類庫將XML轉換成POJO對象。 factory = BindingDirectory.getFactory(clazz); String content = body.toString(UTF_8); //根據碼流開關決定是否打印消息體碼流。 //增加碼流開關往往是為了方便問題定位,在實際項目中,需要打印到日志中。 if (isPrint) { System.out.println("The body is : " + content); } reader = new StringReader(content); IUnmarshallingContext uctx = factory.createUnmarshallingContext(); Object result = uctx.unmarshalDocument(reader); reader.close(); reader = null; return result; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 釋放資源 //如果解碼發生異常,要判斷StringReader是否已經關閉, //如果沒有關閉,則關閉輸入流並通知JVM對其進行垃圾回收。 if (reader != null) { reader.close(); reader = null; } } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import org.jibx.runtime.BindingDirectory; import org.jibx.runtime.IBindingFactory; import org.jibx.runtime.IMarshallingContext; import java.io.StringWriter; import java.nio.charset.Charset; public abstract class AbstractHttpXmlEncoder extends MessageToMessageEncoder { IBindingFactory factory = null; StringWriter writer = null; final static String CHARSET_NAME = "UTF-8"; final static Charset UTF_8 = Charset.forName(CHARSET_NAME); protected ByteBuf encode0(ChannelHandlerContext ctx, Object body) throws Exception { //在此將業務的Order實例序列化為XML字符串。 factory = BindingDirectory.getFactory(body.getClass()); writer = new StringWriter(); IMarshallingContext mctx = factory.createMarshallingContext(); mctx.setIndent(2); mctx.marshalDocument(body, CHARSET_NAME, null, writer); String xmlStr = writer.toString(); writer.close(); writer = null; //將XML字符串包裝成Netty的ByteBuf並返回,實現了HTTP請求消息的XML編碼。 ByteBuf encodeBuf = Unpooled.copiedBuffer(xmlStr, UTF_8); return encodeBuf; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 釋放資源 if (writer != null) { writer.close(); writer = null; } } }
服務端代碼示例
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import java.net.InetSocketAddress; public class HttpXmlServer { public void run(final int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); ch.pipeline().addLast("xml-decoder",new HttpXmlRequestDecoder(Order.class, true)); ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); ch.pipeline().addLast("xml-encoder",new HttpXmlResponseEncoder()); ch.pipeline().addLast("xmlServerHandler",new HttpXmlServerHandler()); } }); ChannelFuture future = b.bind(new InetSocketAddress(port)).sync(); System.out.println("HTTP訂購服務器啟動,網址是 : " + "http://localhost:" + port); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new HttpXmlServer().run(port); } } import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.ArrayList; import java.util.List; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; public class HttpXmlServerHandler extends SimpleChannelInboundHandler { @Override public void messageReceived(final ChannelHandlerContext ctx,Object o) throws Exception { HttpXmlRequest xmlRequest = (HttpXmlRequest)o; HttpRequest request = xmlRequest.getRequest(); Order order = (Order) xmlRequest.getBody(); System.out.println("Http server receive request : " + order); dobusiness(order); ChannelFuture future = ctx.writeAndFlush(new HttpXmlResponse(null, order)); if (!isKeepAlive(request)) { future.addListener(new GenericFutureListener() { public void operationComplete (Future future)throws Exception { ctx.close(); } }); } } private void dobusiness(Order order) { order.getCustomer().setFirstName("狄"); order.getCustomer().setLastName("仁傑"); List midNames = new ArrayList(); midNames.add("李元芳"); order.getCustomer().setMiddleNames(midNames); Address address = order.getBillTo(); address.setCity("洛陽"); address.setCountry("大唐"); address.setState("河南道"); address.setPostCode("123456"); order.setBillTo(address); order.setShipTo(address); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { cause.printStackTrace(); if (ctx.channel().isActive()) { sendError(ctx, INTERNAL_SERVER_ERROR); } } private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("失敗: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } }
客戶端代碼示例
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponseDecoder; import java.net.InetSocketAddress; public class HttpXmlClient { public void connect(int port) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("http-decoder",new HttpResponseDecoder()); ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); // XML解碼器 ch.pipeline().addLast("xml-decoder",new HttpXmlResponseDecoder(Order.class,true)); ch.pipeline().addLast("http-encoder",new HttpRequestEncoder()); ch.pipeline().addLast("xml-encoder",new HttpXmlRequestEncoder()); ch.pipeline().addLast("xmlClientHandler",new HttpXmlClientHandle()); } }); // 發起異步連接操作 ChannelFuture f = b.connect(new InetSocketAddress(port)).sync(); // 等待客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new HttpXmlClient().connect(port); } } import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class HttpXmlClientHandle extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) { HttpXmlRequest request = new HttpXmlRequest(null,OrderFactory.create(123)); ctx.writeAndFlush(request); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override protected void messageReceived(ChannelHandlerContext ctx,Object o) throws Exception { HttpXmlResponse msg = (HttpXmlResponse)o; System.out.println("The client receive response of http header is : " + msg.getHttpResponse().headers().names()); System.out.println("The client receive response of http body is : " + msg.getResult()); } } public class OrderFactory { public static Order create(long orderID) { Order order = new Order(); order.setOrderNumber(orderID); order.setTotal(9999.999f); Address address = new Address(); address.setCity("南京市"); address.setCountry("中國"); address.setPostCode("123321"); address.setState("江蘇省"); address.setStreet1("龍眠大道"); order.setBillTo(address); Customer customer = new Customer(); customer.setCustomerNumber(orderID); customer.setFirstName("李"); customer.setLastName("林峰"); order.setCustomer(customer); order.setShipping(Shipping.INTERNATIONAL_MAIL); order.setShipTo(address); return order; } }
測試結果:
服務端請求消息碼流輸出:
服務端解碼后的業務對象輸出
Http server receive request : Order [orderNumber=123, customer=Customer [customerNumber=123, firstName=李, lastName=林峰, middleNames=null], billTo= Address [street1=龍眠大道, street2=null, city=南京市, state=江蘇省, postCode= 123321, country=中國], shipping=INTERNATIONAL_MAIL, shipTo=Address [street1=龍眠大道, street2=null, city=南京市, state=江蘇省, postCode=123321, country=中國], total=9999.999]
客戶端響應消息碼流輸出:
客戶端解碼后的業務對象輸出
The client receive response of http body is : Order [orderNumber=123, customer=Customer [customerNumber=123, firstName=狄, lastName=仁傑, middleNames=[李元芳]], billTo=Address [street1=龍眠大道, street2=null, city=洛陽, state=河南道, postCode=123456, country=大唐], shipping=INTERNATIONAL_MAIL, shipTo=Address [street1=龍眠大道, street2=null, city=洛陽, state=河南道, postCode=123456, country=大唐], total=9999.999]