jboss marshalling是jboss內部的一個序列化框架,速度也十分快,這里netty也提供了支持,使用十分方便。
TCP在網絡通訊的時候,通常在解決TCP粘包、拆包問題的時候,一般會用以下幾種方式:
1、 消息定長 例如每個報文的大小固定為200個字節,如果不夠,空位補空格;
2、 在消息尾部添加特殊字符進行分割,如添加回車;
3、 將消息分為消息體和消息頭,在消息頭里面包含表示消息長度的字段,然后進行業務邏輯的處理。
在Netty中我們主要利用對象的序列化進行對象的傳輸,雖然Java本身的序列化也能完成,但是Java序列化有很多問題,如后字節碼流太大,以及序列化程度太低等。Jboss的序列化有程度較高、序列化后碼流較小。這里利用Jboss的Marshalling測試一個簡單的對象序列化。
引入marshalling
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>2.0.0.CR1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.0.CR1</version>
</dependency>
server:
public class Server {
public static void main(String[] args) throws InterruptedException {
//1.第一個線程組是用於接收Client端連接的
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.第二個線程組是用於實際的業務處理的
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);//綁定兩個線程池
b.channel(NioServerSocketChannel.class);//指定NIO的模式,如果是客戶端就是NioSocketChannel
// b.option(ChannelOption.SO_BACKLOG, 1024);//TCP的緩沖區設置
// b.option(ChannelOption.SO_SNDBUF, 32*1024);//設置發送緩沖的大小
// b.option(ChannelOption.SO_RCVBUF, 32*1024);//設置接收緩沖區大小
// b.option(ChannelOption.SO_KEEPALIVE, true);//保持連續
b.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//設置Marshalling的編碼和解碼
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture future = b.bind(8765).sync();//綁定端口
future.channel().closeFuture().sync();//等待關閉(程序阻塞在這里等待客戶端請求)
bossGroup.shutdownGracefully();//關閉線程
workerGroup.shutdownGracefully();//關閉線程
}
}
ServerHandler
public class ServerHandler extends ChannelHandlerAdapter{
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
Send send = (Send) msg;
System.out.println("client發送:"+send);
Receive receive = new Receive();
receive.setId(send.getId());
receive.setMessage(send.getMessage());
receive.setName(send.getName());
ctx.writeAndFlush(receive);
}
}
client
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
//sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
//sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f=b.connect("127.0.0.1",8765).sync();
for(int i=1;i<=5;i++){
Send send = new Send();
send.setId(i);
send.setMessage("message"+i);
send.setName("name"+i);
f.channel().writeAndFlush(send);
}
f.channel().closeFuture().sync();
worker.shutdownGracefully();
}
}
clientHandler
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
Receive receive = (Receive) msg;
System.out.println("server反饋:"+receive);
}
}
send
public class Send implements Serializable {
/**
* serialVersionUID:TODO(用一句話描述這個變量表示什么)
*
* @since 1.0.0
*/
private static final long serialVersionUID = 1L;
private Integer id;
private String name;
private String message;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "Send [id=" + id + ", name=" + name + ", message=" + message + "]";
}
}
receive
public class Receive implements Serializable{
/**
* serialVersionUID:TODO(用一句話描述這個變量表示什么)
* @since 1.0.0
*/
private static final long serialVersionUID = 1L;
private Integer id;
private String name;
private String message;
private byte[] sss;
public byte[] getSss() {
return sss;
}
public void setSss(byte[] sss) {
this.sss = sss;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "Receive [id=" + id + ", name=" + name + ", message=" + message + ", sss=" + Arrays.toString(sss) + "]";
}
}
marshalling工廠類
public final class MarshallingCodeCFactory {
/**
* 創建Jboss Marshalling解碼器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//創建了MarshallingConfiguration對象,配置了版本號為5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根據marshallerFactory和configuration創建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
/**
* 創建Jboss Marshalling編碼器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化為二進制數組
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
運行結果
server反饋:Receive [id=1, name=name1, message=message1, sss=null] server反饋:Receive [id=2, name=name2, message=message2, sss=null] server反饋:Receive [id=3, name=name3, message=message3, sss=null] server反饋:Receive [id=4, name=name4, message=message4, sss=null] server反饋:Receive [id=5, name=name5, message=message5, sss=null]
