目錄
前言
最近做一個項目:
- 大概需求: 多個溫度傳感器不斷向java服務發送溫度數據,該傳感器采用socket發送數據;該數據以$符號開頭和結尾,最后將處理的數據存入數據庫;
- 我想到的處理方式:采用netty來接收和處理數據,然后用mybatis將處理后的數據存入數據庫;
我在這之前從來沒使用過netty,在網上倒是看到不少關於netty的文章,如今就趁着這個項目寫一下我所學到的東西和遇到的問題,又是怎么去解決的;
接下來的幾篇文章都是圍繞着這個項目來寫的;本篇主要寫netty的入門demo;
正文
代碼部分
新建一個maven項目
首先在pom.xml中導入:
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
服務端
1. DiscardServer類,netty的服務端
public class DiscardServer {
public void run(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
System.out.println("准備運行端口:" + port);
try {
ServerBootstrap b = new ServerBootstrap();
b = b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChildChannelHandler());
//綁定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服務監聽端口關閉
f.channel().closeFuture().sync();
} finally {
//退出,釋放線程資源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new DiscardServer().run(8080);
}
}
2. ChildChannelHandler類:
public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new DiscardServerHandler());
}
}
3. DiscardServerHandler類
在這里是繼承的ChannelHandlerAdapter類,當然還可以繼承其他的類,例如SimpleChannelInboundHandler,ChannelInboundHandlerAdapter都可以
public class DiscardServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf in = (ByteBuf) msg;
System.out.println("傳輸內容是");
System.out.println(in.toString(CharsetUtil.UTF_8));
ByteBuf resp= Unpooled.copiedBuffer("收到信息$".getBytes());
ctx.writeAndFlush(resp);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出現異常就關閉
cause.printStackTrace();
ctx.close();
}
}
啟動netty服務;
好了,到這里就能開始接收數據了;
客服端
1.TimeClient類
public class TimeClient {
public void connect(int port,String host)throws Exception{
//配置客戶端
System.out.println(port+"--"+host);
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
try {
Bootstrap b=new Bootstrap();
b.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//綁定端口,同步等待成功
ChannelFuture f = b.connect(host,port).sync();
//等待服務監聽端口關閉
f.channel().closeFuture().sync();
}finally {
//優雅退出,釋放線程資源
eventLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new TimeClient().connect(8090,"localhost");
}
}
2.TimeClientHandler 類
public class TimeClientHandler extends ChannelHandlerAdapter {
private byte[] req;
public TimeClientHandler(){
req="$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$".getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message=null;
for(int i=0;i<100;i++){
message=Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf in = (ByteBuf) msg;
System.out.println(in.toString(CharsetUtil.UTF_8));
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出現異常就關閉
cause.printStackTrace();
ctx.close();
}
}
在channelActive類中向服務端發送100次消息
先啟動服務端,再啟動客戶端;
測試結果一:
服務端:
傳輸內容是
$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.7
傳輸內容是
5,027.31,20.00,20.00$$tmb00035ET3318/08/22
客戶端:
8080--localhost
收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息
由於內容太多,就不都貼出來了j,直接寫結果吧:
- 客戶端發送100次數據,但是服務端只收到了28次,然后服務端向客戶端返回28次數據,客戶端卻只收到一次;
- 可以發現服務端接收的數據不是完整接收的,這里出現了拆包,粘包的問題
這里就不討論拆包,粘包了,百度一大堆,相信你也能看明白;
解決粘包,拆包的問題
解決拆包粘包的方法有很多:
- 消息定長,固定每個消息的固定長度
- 在消息末尾使用換行符對消息進行分割,或者使用其他特殊字符來對消息進行分割;
- 將消息分為消息頭和消息體,消息頭中包含標識消息總長度;
- 更復雜的,或者其他的協議。
由於我負責的這個項目戶端發送是由$開始和結束的數據,返回的數據我也設置的$結束,所以我選擇了第二種方法;
只需要在服務端的DiscardServerHandler中和客戶端的ChannelInitializer中添加幾行相同的代碼就行了;
服務端:
public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
socketChannel.pipeline().addLast(new DiscardServerHandler());
}
}
客戶端:
在如下的位置添加如下的代碼:
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
測試結果
這里我就不發送100次數據了,值發送10次:
服務端:
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
客戶端:
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
解決我所遇到的問題了;
總結
- 本來我只需要寫服務端的代碼的,但是為了更好的演示,所以我寫了客戶端
- 本篇文章主要就是使用netty發送和接收數據,還有就是拆包和粘包的問題,當然,netty還可以做其他很多的事情;
- netty針對對拆包粘包的問題有很多種解決辦法:例如可以用LineBasedFrameDecoder和StringDecoder組合將信息已換行符來進行拆分;也可以用我上邊的解決方法來解決以特殊字符結束的信息;
- 在解決拆包粘包信息的時候,注意信息是否符合定義的規則,不然會處理不了數據:例如我上邊的例子,如果服務端在返回信息是不以$符結尾的話,客戶端是打印不出來信息的,因為客戶端會認為服務端還沒有發送完信息,會一直等待,而且打印不出數據;
- 這篇文章只是我入門netty的一個小demo,對我還是很有幫助的,當然也希望對閱讀者有那么一點點幫助;
- 有什么不對的地方還請指正,建議也是多多益善;
- 源碼地址
