基於spring reactor3構建的即時通訊api。


歡迎閱讀Rmessage文檔

技術棧

Rmessage是采用Reactor3,基於reactor-netty項目構建的實時消息推送api。

  • 什么是Reactor3?

    Reactor 是一個用於JVM的完全非阻塞的響應式編程框架,具備高效的需求管理(即對 “背壓(backpressure)”的控制)能力。它與 Java 8 函數式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了異步序列 API Flux(用於[N]個元素)和 Mono(用於 [0|1]個元素),並完全遵循和實現了“響應式擴展規范”(Reactive Extensions Specification)。

  • 使用Reactor好處?

    非常容易構建高吞吐量純異步的代碼,還有就是能夠無縫整合spring5[webflux]項目。

項目簡介

使用Rmessage你需要外部管理群組用戶關系,以及離線消息存儲,Rmessage不提供持久化,測試可以使用默認Handler內存保留離線消息。
整個項目采用純異步的編程思想去開發,旨在學習reactive programming。

目前支持的功能

  1. 單聊
  2. 群聊
  3. 離線消息落地以及拉取
  4. 離線在線通知管理
  5. 心跳,連接鑒權機制
  6. 群組關系管理
  7. 支持多端在線
  8. 目前支持tcp協議,項目設計時考慮多協議擴展。
  9. 支持服務端自定義消息攔截
  10. 消息QOS機制(還未實現)

快速開始

  • 服務端
    Rmessage不管理用戶群組之間關系,通過接口暴漏外部,只要實現接口注入即可。
ServerStart
   .builder()
   .tcp()
   .ip("127.0.0.1")
   .port(1888)
   .onReadIdle(10000l) //設置讀心跳時間
   .onWriteIdle(10000l) //設置寫心跳時間
   .option(ChannelOption.SO_RCVBUF,1023)
   .interceptor(frame -> frame,frame -> frame)// 攔截所有message
   .setAfterChannelInit(channel -> {//  channel設置
      })
   .connect()
   .cast(TcpServerSession.class)
   .subscribe(session->{
                    session.addGroupHandler(groupId -> null).subscribe();
                    session.addOfflineHandler(new DefaultOffMessageHandler()).subscribe();
                    session.addUserHandler(new DefaultUserTransportHandler());
  });

具體接口方法定義請參照接口注釋

  1. UserHandler接口是處理用戶登陸校驗等
  2. OfflineHandler 是處理離線消息存儲,以及拉取離線消息的接口
  3. GroupHandler 是獲取群組下所有用戶的接口
  • 客戶端
 ClientStart
     .builder()
     .tcp()
     .ip("127.0.0.1")
     .port(1888)
     .userId("21344")  //設置用戶名
     .password("12312") //設置密碼
     .onReadIdle(10000l,()->()->System.out.println("心跳了"))//設置讀心跳,以及設置回調runner
     .setClientType(ClientType.Ios)//設置客戶端類型
     .setAfterChannelInit(channel -> {
                    //  channel設置
      })
     .connect()
     .cast(TcpClientSession.class)
     .subscribe(session->{
      session.sendPoint("123","測試一下哦").subscribe(); //發送單聊消息
                    session.sendGroup("group1","123").subscribe();  // 發送群聊消息
                    session.accept(message -> {
    }); // 接受所有消息
  });

Github地址  https://github.com/1ssqq1lxr/Rmessage


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM