歡迎閱讀Rmessage文檔
技術棧
Rmessage是采用Reactor3,基於reactor-netty項目構建的實時消息推送api。
-
什么是Reactor3?
Reactor 是一個用於JVM的完全非阻塞的響應式編程框架,具備高效的需求管理(即對 “背壓(backpressure)”的控制)能力。它與 Java 8 函數式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了異步序列 API Flux(用於[N]個元素)和 Mono(用於 [0|1]個元素),並完全遵循和實現了“響應式擴展規范”(Reactive Extensions Specification)。
-
使用Reactor好處?
非常容易構建高吞吐量純異步的代碼,還有就是能夠無縫整合spring5[webflux]項目。
項目簡介
使用Rmessage你需要外部管理群組用戶關系,以及離線消息存儲,Rmessage不提供持久化,測試可以使用默認Handler內存保留離線消息。
整個項目采用純異步的編程思想去開發,旨在學習reactive programming。
目前支持的功能
- 單聊
- 群聊
- 離線消息落地以及拉取
- 離線在線通知管理
- 心跳,連接鑒權機制
- 群組關系管理
- 支持多端在線
- 目前支持tcp協議,項目設計時考慮多協議擴展。
- 支持服務端自定義消息攔截
- 消息QOS機制(還未實現)
快速開始
- 服務端
Rmessage不管理用戶群組之間關系,通過接口暴漏外部,只要實現接口注入即可。
ServerStart
.builder()
.tcp()
.ip("127.0.0.1")
.port(1888)
.onReadIdle(10000l) //設置讀心跳時間
.onWriteIdle(10000l) //設置寫心跳時間
.option(ChannelOption.SO_RCVBUF,1023)
.interceptor(frame -> frame,frame -> frame)// 攔截所有message
.setAfterChannelInit(channel -> {// channel設置
})
.connect()
.cast(TcpServerSession.class)
.subscribe(session->{
session.addGroupHandler(groupId -> null).subscribe();
session.addOfflineHandler(new DefaultOffMessageHandler()).subscribe();
session.addUserHandler(new DefaultUserTransportHandler());
});
具體接口方法定義請參照接口注釋
- UserHandler接口是處理用戶登陸校驗等
- OfflineHandler 是處理離線消息存儲,以及拉取離線消息的接口
- GroupHandler 是獲取群組下所有用戶的接口
- 客戶端
ClientStart
.builder()
.tcp()
.ip("127.0.0.1")
.port(1888)
.userId("21344") //設置用戶名
.password("12312") //設置密碼
.onReadIdle(10000l,()->()->System.out.println("心跳了"))//設置讀心跳,以及設置回調runner
.setClientType(ClientType.Ios)//設置客戶端類型
.setAfterChannelInit(channel -> {
// channel設置
})
.connect()
.cast(TcpClientSession.class)
.subscribe(session->{
session.sendPoint("123","測試一下哦").subscribe(); //發送單聊消息
session.sendGroup("group1","123").subscribe(); // 發送群聊消息
session.accept(message -> {
}); // 接受所有消息
});
Github地址 https://github.com/1ssqq1lxr/Rmessage