客戶端 Client 登錄和響應處理
瘋狂創客圈 Java 分布式聊天室【 億級流量】實戰系列之 17【 博客園 總入口 】
源碼IDEA工程獲取鏈接: Java 聊天室 實戰 源碼
寫在前面
大家好,我是作者尼恩。目前和幾個小伙伴一起,組織了一個高並發的實戰社群【瘋狂創客圈】。正在開始高並發、億級流程的 IM 聊天程序 學習和實戰
前面,已經完成一個高性能的 Java 聊天程序的四件大事:
-
完成了協議選型,選擇了性能更佳的 Protobuf協議。具體的文章為: Netty+Protobuf 整合一:實戰案例,帶源碼
-
介紹了 通訊消息數據包的幾條設計准則。具體的文章為: Netty +Protobuf 整合二:protobuf 消息通訊協議設計的幾個准則
-
解決了一個非常基礎的問題,這就是通訊的 粘包和半包問題。具體的文章為:Netty 粘包/半包 全解 | 史上最全解讀
-
前一篇文件,已經完成了 系統三大組成模塊的組成介紹。 具體的文章為:Netty聊天程序(實戰一):從0開始實戰100w級流量應用
今天介紹非常重要的一個內容:
客戶端的通訊、登錄請求和登錄響應設計。
下面,開啟今天的 驚險和刺激實戰之旅。
客戶端的會話管理
什么是會話?
為了方便客戶端的開發,管理與服務器的連接,這里引入一個非常重要的中間角色——Session (會話)。有點兒像Web開發中的Tomcat的服務器 Session,但是又有很大的不同。
客戶端的會話概念圖,如下圖所示:
客戶端會話有兩個很重的成員,一個是user,代表了擁有會話的用戶。一個是channel,代表了連接的通道。兩個成員的作用是:
-
通過user,可以獲得當前的用戶信息
-
通過channel,可以向服務器發送消息
所以,會話左擁右抱,左手用戶資料,右手服務器的連接。在本例的開發中,會經常用到。
客戶端的邏輯構成
從邏輯上來說,客戶端有三個子的功能模塊。
模塊一:Handler
入站處理器。
在Netty 中非常重要,負責處理入站消息。比方,服務器發送過來登錄響應,服務器發送過來的聊天消息。
模塊二:MsgBuilder
消息組裝器。
將 Java 內部的 消息 Bean 對象,轉成發送出去的 Protobuf 消息。
模塊三:Sender
消息發送器。
Handler 負責收的工作。Sender 則是負責將消息發送出去。
三大子模塊的類關系圖:
介紹完成了主要的組成部分后,開始服務器的連接和Session 的創建。
連接服務器與Session 的創建
通過bootstrap 幫助類,設置完成線程組、通道類型,向管道流水線加入處理器Handler后,就可以開始連接服務器的工作。
本小節需要重點介紹的,是連接成功之后,創建 Session,並且將 Session和 channel 相互綁定。
代碼如下:
package com.crazymakercircle.chat.client;
//...
@Data
@Service("EchoClient")
public class ChatClient
{
static final Logger LOGGER =
LoggerFactory.getLogger(ChatClient.class);
//..
private Channel channel;
private ClientSender sender;
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup)
{
ChannelFuture f = null;
try
{
if (bootstrap != null)
{
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.remoteAddress(host, port);
// 設置通道初始化
bootstrap.handler(
new ChannelInitializer<SocketChannel>()
{
public void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new ProtobufDecoder());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(chatClientHandler);
}
}
);
LOGGER.info(new Date() + "客戶端開始登錄[瘋狂創客圈IM]");
f = bootstrap.connect().addListener((ChannelFuture futureListener) ->
{
final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess())
{
LOGGER.info("與服務端斷開連接!在10s之后准備嘗試重連!");
eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
initFalg = false;
}
else
{
initFalg = true;
}
if (initFalg)
{
LOGGER.info("EchoClient客戶端連接成功!");
LOGGER.info(new Date() + ": 連接成功,啟動控制台線程……");
channel = futureListener.channel();
// 創建會話
ClientSession session = new ClientSession(channel);
channel.attr(ClientSession.SESSION).set(session);
session.setUser(ChatClient.this.getUser());
startConsoleThread();
}
});
// 阻塞
f.channel().closeFuture().sync();
}
} catch (Exception e)
{
LOGGER.info("客戶端連接失敗!" + e.getMessage());
}
}
//...
}
Session和 channel 相互綁定
Session和 channel 相互綁定,再截取出來,分析一下。
ClientSession session = new ClientSession(channel);
channel.attr(ClientSession.SESSION).set(session);
session.setUser(ChatClient.this.getUser());
為什么要Session和 channel 相互綁定呢?
- 發的時候, 需要從Session 寫入 Channel ,這相當於正向的綁定。
- 收的時候,是從Channel 過來的,需要找到 Session ,這相當於反向的綁定。
Netty 中的 channel ,實現了AttributeMap接口 ,相當於一個 Map容器。 反向的綁定,利用了channel 的這個特點。
看一下AttributeMap接口 如何使用的?
AttributeMap接口的使用
AttributeMap 是一個接口,並且只有一個attr()方法,接收一個AttributeKey類型的key,返回一個Attribute類型的value。按照Javadoc,AttributeMap實現必須是線程安全的。
AttributeMap內部結構看起來像下面這樣:
不要被嚇着了,其實很簡單。
AttributeMap 的使用,主要是設置和取值。
- 設值 Key-> Value
AttributeMap 的設值的方法,舉例如下:
channel.attr(ClientSession.SESSION).set(session);
這個是鏈式調用,attr() 方法中的是 Key, set()方法中的是Value。 這樣就完成了 Key-> Value 的設置。
- 取值
AttributeMap 的取值的方法,舉例如下:
ClientSession session =
ctx.channel().attr(ClientSession.SESSION).get();
這個是鏈式調用,attr() 方法中的是 Key, get()方法返回 的是Value。 這樣就完成了 取值。
關鍵是,這個key比較特殊。
一般的Map,Key 的類型多半為字符串。但是這里的Key不行,有特殊的約定。
Key的類型必須是 AttributeKey 類型,而且這是一個泛型類,它的優勢是,不需要對值進行強制的類型轉換。
Key的例子如下:
public static final AttributeKey<ClientSession> SESSION = AttributeKey.valueOf("session");
客戶端登錄請求
登錄的請求,大致如下:
ClientSender的 代碼如下:
package com.crazymakercircle.chat.client;
@Service("ClientSender")
public class ClientSender
{
static final Logger LOGGER = LoggerFactory.getLogger(ClientSender.class);
private User user;
private ClientSession session;
public void sendLoginMsg()
{
LOGGER.info("開始登陸");
ProtoMsg.Message message = LoginMsgBuilder.buildLoginMsg(user);
session.writeAndFlush(message);
}
//...
public boolean isLogin()
{
return session.isLogin();
}
}
Sender 首先通過 LoginMsgBuilder,構造一個protobuf 消息。然后調用session發送消息。
session 會通過綁定的channel ,將消息發送出去。
session的代碼,如下:
public synchronized void writeAndFlush(Object pkg)
{
channel.writeAndFlush(pkg);
}
其他的客戶端請求流程,大致也是類似的。
一個客戶端的請求大致的流程有三步,分別從Sender 到session到channel。
處理登錄成功的響應
這是從服務器過來的入站消息。 如果登錄成功,服務器會發送一個登錄成功的響應過來。 這個響應,會從channel 傳遞到Handler。
處理器 LoginResponceHandler 的代碼如下:
package com.crazymakercircle.chat.clientHandler;
//...
public class LoginResponceHandler extends ChannelInboundHandlerAdapter
{
static final Logger LOGGER = LoggerFactory.getLogger(LoginResponceHandler.class);
/**
* 業務邏輯處理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
LOGGER.info("msg:{}", msg.toString());
if (msg != null && msg instanceof ProtoMsg.Message)
{
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.LoginResponse info = pkg.getLoginResponse();
ProtoInstant.ResultCodeEnum result =
ProtoInstant.ResultCodeEnum.values()[info.getCode()];
if (result.equals(ProtoInstant.ResultCodeEnum.SUCCESS))
{
ClientSession session =
ctx.channel().attr(ClientSession.SESSION).get();
session.setLogin(true);
LOGGER.info("登錄成功");
}
}
}
}
LoginResponceHandler 對消息類型進行判斷,如果是請求響應消息,並且登錄成功。 則取出綁定的session,通過session,進一步完成登錄成功后的業務處理。
比如設置成功的狀態,完成一些成功的善后處理操作等等。
其他的客戶端響應處理流程,大致也是類似的。
寫在最后
至此為止,可以看到,客戶端登錄的完整流程。
下一篇:服務器的請求處理和通訊的全流程閉環介紹。
瘋狂創客圈 Java 死磕系列
- Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
- Netty 源碼、原理、JAVA NIO 原理
- Java 面試題 一網打盡
- 瘋狂創客圈 【 博客園 總入口 】