石墨文檔:https://shimo.im/docs/tHwJJcvKl2AIiCZD/
(二期)18、開源t-io項目解讀
t-io:
websocket
- 同步
- 異步
- 阻塞
- 非阻塞
- 一個連接一個線程
- 一個請求一個線程
**
Java對BIO、NIO、AIO的支持:
BIO、NIO、AIO適用場景分析:
- ChannelContext(通道上下文)
- GroupContext(服務配置與維護)
- AioHandler(消息處理接口)
- AioListener(通道監聽者)
- Packet(應用層數據包)
- ObjWithLock(自帶讀寫鎖的對象)
t-io是基於tcp層協議的一個網絡框架,所以在應用層與tcp傳輸層之間設計到一個數據的編碼與解碼問題,t-io讓我們能自定義數據協議,所以需要我們自己手動去編碼解碼過程。
https://gitee.com/tywo45/tio-showcase
- 導入核心包
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core</artifactId>
</dependency>
- HelloServerStarter
- HelloServerAioHandler
- HelloPacket
- Const
- HelloClientStarter
- HelloClientAioHandler
- idea請安裝PlantUML intergration插件
(初始化服務器)
(客戶端與服務端通訊流程)
https://gitee.com/tywo45/tio-showcase
LoginReqBody loginReqBody = new LoginReqBody();
loginReqBody.setLoginname(loginname);
loginReqBody.setPassword(password);
ShowcasePacket reqPacket = new ShowcasePacket();
#這里指定消息類型
reqPacket.setType(Type.LOGIN_REQ);
reqPacket.setBody(Json.toJson(loginReqBody).getBytes(ShowcasePacket.CHARSET));
Tio.send(clientChannelContext, reqPacket);
- LoginReqBody
- ShowcasePacket
- clientChannelContext
ShowcasePacket showcasePacket = (ShowcasePacket) packet;
#獲取消息類型
Byte type = showcasePacket.getType();
#根據消息類型找到對應的消息處理類
AbsShowcaseBsHandler<?> showcaseBsHandler = handlerMap.get(type);
if (showcaseBsHandler == null) {
log.error("{}, 找不到處理類,type:{}", channelContext, type);
return;
}
#執行消息處理。消息處理類必須繼承AbsShowcaseBsHandler
showcaseBsHandler.handler(showcasePacket, channelContext);
- handlerMap
- AbsShowcaseBsHandler
private static Map<Byte, AbsShowcaseBsHandler<?>> handlerMap = new HashMap<>();
static {
#把消息類型與消息處理類映射起來
handlerMap.put(Type.GROUP_MSG_REQ, new GroupMsgReqHandler());
handlerMap.put(Type.HEART_BEAT_REQ, new HeartbeatReqHandler());
handlerMap.put(Type.JOIN_GROUP_REQ, new JoinGroupReqHandler());
handlerMap.put(Type.LOGIN_REQ, new LoginReqHandler());
handlerMap.put(Type.P2P_REQ, new P2PReqHandler());
}
log.info("收到點對點請求消息:{}", Json.toJson(bsBody));
ShowcaseSessionContext showcaseSessionContext = (ShowcaseSessionContext) channelContext.getAttribute();
P2PRespBody p2pRespBody = new P2PRespBody();
p2pRespBody.setFromUserid(showcaseSessionContext.getUserid());
p2pRespBody.setText(bsBody.getText());
ShowcasePacket respPacket = new ShowcasePacket();
respPacket.setType(Type.P2P_RESP);
respPacket.setBody(Json.toJson(p2pRespBody).getBytes(ShowcasePacket.CHARSET));
Tio.sendToUser(channelContext.groupContext, bsBody.getToUserid(), respPacket);
項目集成:
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-server</artifactId>
<version>0.0.5-tio-websocket</version>
</dependency>
代碼結構
事件定義
new Object() {
@Subscribe
public void lister(Integer integer) {
System.out.printf("%d from int%n", integer);
}
}
事件發布
//定義事件
final EventBus eventBus = new EventBus();
//注冊事件
eventBus.register(new Object() {
//使用@Subscribe說明訂閱事件處理方法
@Subscribe
public void lister(Integer integer) {
System.out.printf("%s from int%n", integer);
}
@Subscribe
public void lister(Number integer) {
System.out.printf("%s from Number%n", integer);
}
@Subscribe
public void lister(Long integer) {
System.out.printf("%s from long%n", integer);
}
});
//發布事件
eventBus.post(1);
eventBus.post(1L);
項目的而運用
主要處理事件
關鍵類:
調用:
邏輯:
/**
* 添加好友成功之后向對方推送消息
* */
public static void pushAddFriendMessage(long applyid){
if(applyid==0){
return;
}
Apply apply = applyService.getApply(applyid);
ChannelContext channelContext = getChannelContext(""+apply.getUid());
//先判斷是否在線,再去查詢數據庫,減少查詢次數
if (channelContext != null && !channelContext.isClosed()) {
LayimToClientAddFriendMsgBody body = new LayimToClientAddFriendMsgBody();
User user = getUserService().getUser(apply.getToid());
if (user==null){return;}
//對方分組ID
body.setGroupid(apply.getGroup());
//當前用戶的基本信息,用於調用layim.addList
body.setAvatar(user.getAvatar());
body.setId(user.getId());
body.setSign(user.getSign());
body.setType("friend");
body.setUsername(user.getUserName());
push(channelContext, body);
}
}
/**
* 服務端主動推送消息
* */
private static void push(ChannelContext channelContext,Object msg) {
try {
WsResponse response = BodyConvert.getInstance().convertToTextResponse(msg);
Aio.send(channelContext, response);
}catch (IOException ex){
}
}
- 登錄功能
- 單聊功能
- 群聊功能
- 其他自定義消息提醒功能
- 等等。。。。
layim.config({
//初始化接口
init: {
url: '/layim/base'
}
//查看群員接口
,members: {
url: '/layim/members'
}
//上傳圖片接口
,uploadImage: {url: '/upload/file'}
//上傳文件接口
,uploadFile: {url: '/upload/file'}
,isAudio: true //開啟聊天工具欄音頻
,isVideo: true //開啟聊天工具欄視頻
,initSkin: '5.jpg' //1-5 設置初始背景
,notice: true //是否開啟桌面消息提醒,默認false
,msgbox: '/layim/msgbox'
,find: layui.cache.dir + 'css/modules/layim/html/find.html' //發現頁面地址,若不開啟,剔除該項即可
,chatLog: layui.cache.dir + 'css/modules/layim/html/chatLog.html' //聊天記錄頁面地址,若不開啟,剔除該項即可
});
socket.config({
log:true,
token:'/layim/token',
server:'ws://127.0.0.1:8888'
});
org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.fyp.layim.im.server.LayimServerAutoConfig
//初始化t-io的serverGroupContext
//還有消息處理器與消息類型的映射關系
public LayimServerStarter(LayimServerConfig wsServerConfig, IWsMsgHandler wsMsgHandler, TioUuid tioUuid, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) throws Exception {
this.layimServerConfig = wsServerConfig;
this.wsMsgHandler = wsMsgHandler;
layimServerAioHandler = new LayimServerAioHandler(wsServerConfig, wsMsgHandler);
layimServerAioListener = new LayimServerAioListener();
serverGroupContext = new ServerGroupContext(layimServerAioHandler, layimServerAioListener, tioExecutor, groupExecutor);
//心跳時間,暫時設置為0
serverGroupContext.setHeartbeatTimeout(wsServerConfig.getHeartBeatTimeout());
serverGroupContext.setName("Tio Websocket Server for LayIM");
aioServer = new AioServer(serverGroupContext);
serverGroupContext.setTioUuid(tioUuid);
//initSsl(serverGroupContext);
//初始化消息處理器
LayimMsgProcessorManager.init();
}
SetWithLock<ChannelContext> checkChannelContexts =
Aio.getChannelContextsByUserid(channelContext.getGroupContext(),body.getId());
private HttpResponse handleHandshakeUserInfo(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
UserService userService = getUserService();
//增加token驗證方法
String path = httpRequest.getRequestLine().getPath();
String token = URLDecoder.decode(path.substring(1),"utf-8");
String userId = TokenVerify.IsValid(token);
if (userId == null) {
//沒有token 未授權
httpResponse.setStatus(HttpResponseStatus.C401);
} else {
long uid = Long.parseLong(userId);
//解析token
LayimContextUserInfo userInfo = userService.getContextUserInfo(uid);
if (userInfo == null) {
//沒有找到用戶
httpResponse.setStatus(HttpResponseStatus.C404);
} else {
channelContext.setAttribute(userId, userInfo.getContextUser());
//綁定用戶ID
Aio.bindUser(channelContext, userId);
//綁定用戶群組
List<String> groupIds = userInfo.getGroupIds();
//綁定用戶群信息
if (groupIds != null) {
groupIds.forEach(groupId -> Aio.bindGroup(channelContext, groupId));
}
//通知所有好友本人上線了
notify(channelContext,true);
}
}
return httpResponse;
}