Netty是一個基於NIO,異步的,事件驅動的網絡通信框架。由於使用Java提供 的NIO包中的API開發網絡服務器代碼量大,復雜,難保證穩定性。netty這類的網絡框架應運而生。通過使用netty框架可以快速開發網絡通信服務端,客戶端。
本文主要通過一個簡單的聊天程序來熟悉初步使用Nettyty進行簡單服務端與客戶端的開發。本聊天系統主要功能有點對點聊天及服務端推送消息。
程序結構:
Server端: IMServer 服務器啟動類 ServerHandler 服務端核心類 負責客戶端認證及消息轉發
Client端: IMClient 客戶端啟動類 ClientHandler 客戶端核心類,負責客戶端消息的發送及接收
Coder:MsgPackDecode和MsgPackEncode,負責消息的解碼及編碼實現,消息的編解碼基於第三方庫msgpack
代碼分析:
代碼結構如下

ApplicationContext類
功能比較簡單,主要用來保存登錄用戶信息,以Map來存儲,其中key為用戶ID,value為客戶端對應的ChannelHandlerContext對象。
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
import java.util.Map;
/**
* Created by Andy on 2016/10/8.
*/
public class ApplicationContext {
public static Map<Integer,ChannelHandlerContext> onlineUsers = new HashMap<Integer,ChannelHandlerContext>();
public static void add(Integer uid,ChannelHandlerContext ctx){
onlineUsers.put(uid,ctx);
}
public static void remove(Integer uid){
onlineUsers.remove(uid);
}
public static ChannelHandlerContext getContext(Integer uid){
return onlineUsers.get(uid);
}
}
IMServerConfig接口
該接口主要用來存儲服務端啟動的配置信息,可改為配置文件實現
import com.wavemelody.nettyim.struts.MessageType;
/**
* Created by Andy on 2016/10/8.
*/
public interface IMServerConfig {
/**客戶端配置*/
int CLIENT_VERSION = 1; //版本號
/**服務端配置*/
String SERVER_HOST = "127.0.0.1"; //服務器IP
int SERVER_PORT = 9090; //服務器端口
/**消息相關*/
int SERVER_ID = 0; //表示服務器消息
byte APP_IM = 1; //即時通信應用ID為1
MessageType TYPE_MSG_CONNECT = MessageType.TYPE_AUTH; //連接后第一次消息確認建立連接和發送認證信息
MessageType TYPE_MSG_TEXT = MessageType.TYPE_TEXT; //文本消息
String MSG_DEFAULT = ""; //空消息
}
ServerHandler
服務端主要的消息處理Handler,負責客戶端認證之后,對客戶端信息的保存及客戶端點對點消息的轉發以及程序異常時對資源的關閉等業務。
import com.wavemelody.nettyim.server.core.ApplicationContext;
import com.wavemelody.nettyim.struts.IMMessage;
import com.wavemelody.nettyim.struts.MessageType;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Created by Andy on 2016/10/8.
*/
public class ServerHandler extends ChannelInboundHandlerAdapter{
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("服務端Handler創建。。。");
super.handlerAdded(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
super.channelInactive(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
super.channelActive(ctx);
System.out.println("有客戶端連接:" + ctx.channel().remoteAddress().toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
IMMessage message = (IMMessage)msg;
if(message.getMsgType() == MessageType.TYPE_AUTH.value()){ //認證消息
System.out.println("認證消息:" + msg);
ApplicationContext.add(message.getUid(),ctx);
}else if(message.getMsgType() == MessageType.TYPE_TEXT.value()){ //CHAT消息
ChannelHandlerContext c = ApplicationContext.getContext(message.getReceiveId());
if(c==null){ //接收方不在線,反饋給客戶端
message.setMsg("對方不在線!");
ctx.writeAndFlush(message);
}else{ //將消轉發給接收方
System.out.println("轉發消息:" + msg);
c.writeAndFlush(message);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("與客戶端斷開連接:"+cause.getMessage());
cause.printStackTrace();
ctx.close();
}
}
IMServer
服務端的啟動類,關於服務端,有以下幾點需要說明
- runServerCMD()方法用來啟動控制台,啟動后,可以對用戶輸入的內容進行消息推送。
- MsgPackEncode和MsgPackDecode用於消息的編解碼。使用的是MessagePack(API使用簡單,編碼后字節流特小,編解碼速度較快,同時幾乎支持所有主流編程語言,詳情見官網:http://msgpack.org/)。這樣我們可以隨意編寫實體用於發送消息,相關代碼后邊給出。
- LengthFieldBasedFrameDecoder和LengthFieldPrepender:因為TCP底層傳輸數據時是不了解上層業務的,所以傳輸消息的時候很容易造成粘包/半包的情況(一條完整的消息被拆開或者完整或者不完整的多條消息被合並到一起發送、接收),這兩個工具就是Netty提供的消息編碼工具,2表示消息長度(不是正真的長度為2,是2個字節)。
import com.wavemelody.nettyim.codec.MsgPackDecode;
import com.wavemelody.nettyim.codec.MsgPackEncode;
import com.wavemelody.nettyim.server.config.IMServerConfig;
import com.wavemelody.nettyim.server.core.ApplicationContext;
import com.wavemelody.nettyim.server.handler.ServerHandler;
import com.wavemelody.nettyim.struts.IMMessage;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.io.IOException;
import java.util.Map;
import java.util.Scanner;
/**
* Created by Andy on 2016/10/8.
*/
public class IMServer implements Runnable,IMServerConfig{
public static void main(String[] args) throws IOException{
new IMServer().start();
}
public void start() throws IOException{
new Thread(this).start();
runServerCMD();
}
private IMMessage getMessage(){
int toID = -1;
IMMessage message = new IMMessage(
APP_IM,
CLIENT_VERSION,
SERVER_ID,
TYPE_MSG_TEXT.value(),
toID,
MSG_DEFAULT);
return message;
}
private void runServerCMD()throws IOException{
Scanner scanner = new Scanner(System.in);
IMMessage message = null;
do{
message = getMessage();
message.setMsg(scanner.nextLine());
}while(sendMsg(message));
}
private boolean sendMsg(IMMessage msg){
// 當用戶輸入quit表示退出,不在進行推送
boolean result = msg.getMsg().equals("quit") ? false:true;
if(result){
int receiveID = msg.getReceiveId();
String content = msg.getMsg();
if(content.startsWith("#") && content.indexOf(":") != -1){
try {
/**
* 用戶輸入指定的推送客戶端
* 輸入文本格式為: "#8888:發送內容"
* “#”和“:”之間內容為用戶ID,“:”之后為推送消息內容
*/
receiveID = Integer.valueOf(content.substring(1,content.indexOf(":")));
msg.setReceiveId(receiveID);
msg.setMsg(content.substring(content.indexOf(":")));
} catch (NumberFormatException e) {
//解析失敗則,默認發送所有
e.printStackTrace();
}
}
/**
* 默認推送所有用戶(默認receiveID為-1)
* */
if(receiveID == -1){
System.out.println("推送消息給所有在線用戶:" + msg);
for(Map.Entry<Integer,ChannelHandlerContext> entry: ApplicationContext.onlineUsers.entrySet()){
ChannelHandlerContext c = entry.getValue();
c.writeAndFlush(msg);
}
}else{
ChannelHandlerContext ctx = ApplicationContext.getContext(receiveID);
if(ctx!=null){
System.out.println("推送消息:" + msg);
ctx.writeAndFlush(msg);
}
}
}
return result;
}
@Override
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));
ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());
ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture f = b.bind(SERVER_PORT).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ClientHandler
客戶端Handler
import com.wavemelody.nettyim.client.config.IMClientConfig; import com.wavemelody.nettyim.struts.IMMessage; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.io.IOException; /** * Created by Andy on 2016/10/8. */ public class ClientHandler extends ChannelInboundHandlerAdapter implements IMClientConfig{ private ChannelHandlerContext ctx; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("用戶["+ UID + "]成功連接服務器"); this.ctx = ctx; //通道建立時發送認證消息給服務器 IMMessage message = new IMMessage( APP_IM, CLIENT_VERSION, UID, TYPE_MSG_AUTH.value(), DEFAULT_RECEIVE_ID, MSG_DEFAULT); sendMsg(message); } public boolean sendMsg(IMMessage msg) throws IOException { boolean result = msg.getMsg().equals("quit") ? false:true; if(result){ if(msg.getMsgType() != MessageType.TYPE_AUTH.value()){ System.out.println("認證消息: " + "client[" + msg.getUid() + "]:" + msg.getMsg()); } //設置接收端ID和發送消息 if(msg.getMsgType() == MessageType.TYPE_TEXT.value()){ if(msg.getMsg().contains(":")){ String[] msgs = msg.getMsg().split(":"); String receiveIdStr =msgs[0].substring(1); msg.setReceiveId(Integer.valueOf(receiveIdStr)); msg.setMsg(msgs[1]); } } ctx.writeAndFlush(msg); } return result; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { IMMessage m = (IMMessage)msg; System.out.println("receive[" + m.getUid() + "]:" + m.getMsg()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("與服務器斷開連接:" + cause.getMessage()); ctx.close(); } }
IMClient類
客戶端啟動類,同時啟動控制台,將用戶輸入消息發送給指定的客戶端
import com.wavemelody.nettyim.client.config.IMClientConfig;
import com.wavemelody.nettyim.client.handler.ClientHandler;
import com.wavemelody.nettyim.codec.MsgPackDecode;
import com.wavemelody.nettyim.codec.MsgPackEncode;
import com.wavemelody.nettyim.struts.IMMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import java.io.IOException;
import java.util.Scanner;
/**
* Created by Andy on 2016/10/8.
*/
public class IMClient implements Runnable,IMClientConfig{
private ClientHandler clientHandler = new ClientHandler();
public static void main(String[] args) throws IOException{
new IMClient().start();
}
public void start() throws IOException{
new Thread(this).start();
runClientCMD();
}
public void runClientCMD() throws IOException{
IMMessage message = new IMMessage(
APP_IM,
CLIENT_VERSION,
UID,
TYPE_MSG_TEXT.value(),
DEFAULT_RECEIVE_ID,
MSG_DEFAULT);
Scanner scanner = new Scanner(System.in);
do{
message.setMsg(scanner.nextLine());
}
while (clientHandler.sendMsg(message));
}
@Override
public void run() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));
ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());
ch.pipeline().addLast(clientHandler);
}
});
ChannelFuture f = b.connect(SERVER_HOST, SERVER_PORT).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
IMMessage類
該類即為通訊過程中的消息實體,即通訊協議,客戶端進行消息發送,服務端進行消息推送時都需要將發送內容封裝為IMMessage對象,才可以被識別。
import org.msgpack.annotation.Message;
/**
* Created by Andy on 2016/10/8.
*/
@Message
public class IMMessage {
//應用ID
private byte appId;
//版本
private int version;
//用戶ID
private int uid;
//消息類型 0:登錄 1:文字消息
private byte msgType;
//接收方
private int receiveId;
//消息內容
private String msg;
public IMMessage(){
}
/**
* 構造方法
* @param appId 應用通道
* @param version 應用版本
* @param uid 用戶ID
* @param msgType 消息類型
* @param receiveId 消息接收者
* @param msg 消息內容
*/
public IMMessage(byte appId, int version, int uid, byte msgType, int receiveId, String msg) {
this.appId = appId;
this.version = version;
this.uid = uid;
this.msgType = msgType;
this.receiveId = receiveId;
this.msg = msg;
}
public byte getAppId() {
return appId;
}
public void setAppId(byte appId) {
this.appId = appId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getUid() {
return uid;
}
public void setUid(int uid) {
this.uid = uid;
}
public byte getMsgType() {
return msgType;
}
public void setMsgType(byte msgType) {
this.msgType = msgType;
}
public int getReceiveId() {
return receiveId;
}
public void setReceiveId(int receiveId) {
this.receiveId = receiveId;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "IMMessage{" +
"appId=" + appId +
", version=" + version +
", uid=" + uid +
", msgType=" + msgType +
", receiveId=" + receiveId +
", msg='" + msg + '\'' +
'}';
}
}
MessageType類
消息類型,通過枚舉類型來約束消息中消息類型字段內容,防止出現系統不能識別的消息類型而發生異常。
/**
* Created by Andy on 2016/10/9.
*/
public enum MessageType {
TYPE_AUTH((byte)0),TYPE_LOGOUT((byte)1),TYPE_TEXT((byte)2),TYPE_EMPTY((byte)3);
private byte value;
MessageType(byte value){
this.value = value;
}
public byte value(){
return this.value;
}
}
IMClientConfig接口
主要用來定義客戶端啟動配置信息常量,可改為配置文件實現方式。
import com.wavemelody.nettyim.struts.MessageType;
/**
* Created by Andy on 2016/10/9.
*/
public interface IMClientConfig {
/**客戶端配置*/
int CLIENT_VERSION = 1; //版本號
/**服務端配置*/
String SERVER_HOST = "127.0.0.1"; //服務器IP
int SERVER_PORT = 9090; //服務器端口
/**消息相關*/
byte APP_IM = 1; //即時通信應用ID為1
int UID = 8888;
int DEFAULT_RECEIVE_ID = 9999;
MessageType TYPE_MSG_AUTH = MessageType.TYPE_AUTH; //連接后第一次消息確認建立連接和發送認證信息
MessageType TYPE_MSG_TEXT = MessageType.TYPE_TEXT; //文本消息
String MSG_DEFAULT = ""; //默認為空消息
}
MsgPackEncode類
使用msgpack實現對消息的編碼實現。
import com.wavemelody.nettyim.struts.IMMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
/**
* Created by Andy on 2016/10/8.
*/
public class MsgPackEncode extends MessageToByteEncoder<IMMessage> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, IMMessage msg, ByteBuf out) throws Exception {
out.writeBytes(new MessagePack().write(msg));
}
}
MsgPackDecode類
使用msgpack實現對消息的解碼實現。
import com.wavemelody.nettyim.struts.IMMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import java.util.List;
/**
* Created by Andy on 2016/10/8.
*/
public class MsgPackDecode extends MessageToMessageDecoder<ByteBuf>{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List<Object> out) throws Exception {
final int length = msg.readableBytes();
final byte[] array = new byte[length];
msg.getBytes(msg.readerIndex(),array,0,length);
out.add(new MessagePack().read(array, IMMessage.class));
}
}
通過以上代碼基本上已經實現了一個簡單的聊天程序,當然還存在很多地方需要優化。一個就是對TCP連接的優化,不是指定SO_KEEPALIVE屬性,而是改為發送心跳消息來維持客戶端和服務器的連接;然后就是鏈路中斷后的重連實現,當出現中斷之后由客戶端等待一定時間重新發起連接操作,直至連接成功;另外一個就是重復登錄驗證,在客戶端已經登錄的情況下,要拒絕重復登錄,防止客戶端在異常狀態下反復重連導致句柄資源被耗盡。
