websocket 集群處理方案


  2020年初 疫情的突然襲來、讓人們都宅在家里,越來越多的公司、平台上線了直播電商的業務。筆者的公司也打算做小程序直播訂單的業務。直播互動、點贊功能、統計用戶在線時長的頻率(1/1min)的心跳消息打算用websocket 來實現。 

  項目中使用的spring結合websocket配置。直播的互動消息和聊天室的功能很像。網上也有很多demo 可以參考。

  在服務器集群的環境下,會涉及到WebSocket Session共享的問題。針對這個問題想到一個比較簡單的解決辦法是。

  1、關於在線人數用redis集群去維護在線人數 ,可以根據活動id (或者聊天室id)等構件key-value 唯一鍵值對,通過人數遞增、遞減去維護。

  2、關於發送消息,可能會存在同一直播間的用戶被nginx 負載均衡到不同的機器上,也就會出現 一個直播活動(聊天室) 同時被創建並存在  A 服務器 和 B 服務器上  所以就要想辦法讓處於兩個服務器上的用戶像在一台上一樣,可以交流。這里發彈幕(發消息)、上線通知、點贊、用戶心跳信息等消息 都是用kafka 去處理。消息的類型可以用一個type字段進行區分, producer 把消息發送到 一個topic中。 然后kafka consumer  去接收這個topic 的消息去處理,這里面接收的時候 consumer 需要設置不同的group_id(可以設置成服務器ip)確保都接收到消息。接收到消息在判斷是否存在 該活動直播(聊天室),如果有則處理推送消息等相關操作。下面是部分功能代碼

 1、websocket (連接、斷開連接、發送消息 、接收消息、在線人數)

  1 /**
  2  *
  3  * webSocket消息
  4  * @author kim
  5  * @date 2020/3/12
  6 */
  7 @Component
  8 @Slf4j
  9 @ServerEndpoint("/websocket/{activityId}/{username}")
 10 public class WebSocket {
 11 
 12     public static RedisUtil redisUtil;
 13 
 14     private static String liveMessageTopic = "liveMessageTopic";
 15 
 16     public static ShopLiveMessageService shopLiveMessageService;
 17 
 18     /**
 19      * 使用map來收集session,key為直播活動id,value為同一個活動的用戶集合
 20     */
 21     private static Map<Integer, Set<Session>> lives = new ConcurrentHashMap<>();
 22 
 23     @Autowired
 24     private DocSensitiveService docSensitiveService;
 25 
 26     /**
 27      * 建立連接
 28      *
 29      * @param session
 30      */
 31     @OnOpen
 32     public void onOpen(@PathParam("activityId")Integer activityId, @PathParam("username") String username, Session session)
 33     {
 34         //onlineNumber++;
 35         if (!lives.containsKey(activityId)){
 36             //創建一個新的活動房間
 37             Set<Session> room = new HashSet<Session>();
 38             room.add(session);
 39             lives.put(activityId, room);
 40             // 初始該活動點贊數
 41             updateZanNum(activityId, 0);
 42         }else {
 43             lives.get(activityId).add(session);
 44         }
 45         // 該活動在線人數+1
 46         updateOnlineNum(activityId);
 47         log.info("現在來直播活動"+activityId+"連接的客戶id:"+session.getId()+"用戶名:"+username);
 48         log.info("有新連接加入! 當前在線人數" + getOnlineNum(activityId));
 49         try {
 50             //messageType 1代表上線 2代表下線 3代表在線名單 4代表普通消息 5 在線人數
 51             //先給所有人發送通知,說我上線了
 52             String dateStr = TimeUtils.date2Str(new Date(), "yyyy-MM-dd HH:mm:ss");
 53             LiveMessageProduct liveMessageProduct = new LiveMessageProduct();
 54             liveMessageProduct.setNickname(username);
 55             liveMessageProduct.setActivityId(activityId);
 56             liveMessageProduct.setCreateTime(dateStr);
 57             liveMessageProduct.setUpdateTime(dateStr);
 58             liveMessageProduct.setMessageType(1);
 59             shopLiveMessageService.liveMessageSendToKafka(liveMessageProduct, liveMessageTopic);
 60             //給所有人發一條消息:更新在線人數
 61             sendOnlineNum(activityId);
 62             // 給自己發:目前活動點贊數量
 63             Map<String,Object> map3 = new HashMap<>();
 64             map3.put("messageType",6);
 65             map3.put("zanNumber",getZanNum(activityId));
 66             sendMessageTo(JSON.toJSONString(map3), session);
 67         }
 68         catch (IOException e){
 69             log.error(username+"上線的時候通知所有人發生了錯誤");
 70         }
 71     }
 72 
 73     @OnError
 74     public void onError(Session session, Throwable error) {
 75         log.error("服務端發生了錯誤"+error);
 76         //error.printStackTrace();
 77     }
 78     /**
 79      * 連接關閉
 80      */
 81     @OnClose
 82     public void onClose(@PathParam("activityId")Integer activityId, @PathParam("username") String username, Session session)
 83     {
 84         // 更新在線人數
 85         decrOnlineNum(activityId);
 86         lives.get(activityId).remove(session);
 87         try {
 88             //messageType 1代表上線 2代表下線 3代表在線名單  4代表普通消息 5 在線人數
 89             //發送kafka 在線人數
 90             sendOnlineNum(activityId);
 91         }
 92         catch (Exception e){
 93             log.error(username+"下線的時候通知所有人發生了錯誤");
 94         }
 95         log.info("有連接關閉! 當前在線人數" + getOnlineNum(activityId));
 96     }
 97 
 98     /**
 99      * 收到客戶端的消息
100      *
101      * @param message 消息
102      * @param session 會話
103      */
104     @OnMessage
105     public void onMessage(@PathParam("activityId")Integer activityId, @PathParam("username") String username, String message, Session session)
106     {
107         try {
108             log.info("來自客戶端消息:" + message+"客戶端的id是:"+session.getId());
109             JSONObject jsonObject = JSON.parseObject(message);
110             Integer companyId = Integer.parseInt(jsonObject.getString("company_id"));
111             String openid = jsonObject.getString("openid");
112             String unionid = jsonObject.getString("unionid");
113             String avatar = jsonObject.getString("avatar");
114             Integer messageType = jsonObject.getInteger("messageType");
115             //messageType 1代表上線 2代表下線 3代表在線名單  4代表普通消息 5 在線人數 6點贊 7 心跳包
116             LiveMessageProduct liveMessageProduct = new LiveMessageProduct();
117             if (messageType.equals(4)){
118                 String textMessage = jsonObject.getString("message");
119                 liveMessageProduct.setContent(textMessage);
120             }else if (messageType.equals(6)){
121                 updateZanNum(activityId, 1);
122                 liveMessageProduct.setZanNumber(getZanNum(activityId));
123             }
124             // 消息同步kafka
125             String dateStr = TimeUtils.date2Str(new Date(), "yyyy-MM-dd HH:mm:ss");
126             liveMessageProduct.setNickname(username);
127             liveMessageProduct.setActivityId(activityId);
128             liveMessageProduct.setCompanyId(companyId);
129             liveMessageProduct.setOpenid(openid);
130             liveMessageProduct.setUnionid(unionid);
131             liveMessageProduct.setAvatar(avatar);
132             liveMessageProduct.setCreateTime(dateStr);
133             liveMessageProduct.setUpdateTime(dateStr);
134             liveMessageProduct.setMessageType(messageType);
135             shopLiveMessageService.liveMessageSendToKafka(liveMessageProduct, liveMessageTopic);
136             shopLiveMessageService.liveMessageSendToKafka(liveMessageProduct, "saveLiveMessageTopic");
137         }
138         catch (Exception e){
139             log.error("發生了錯誤了-{}", e);
140         }
141 
142     }
143 
144     /**
145      *
146      * 獲取在線人數
147      * @param activityId 直播id
148      * @author kim
149      * @date 2020/4/2
150      * @return java.lang.Long
151     */
152     private Long getOnlineNum(Integer activityId){
153         long onlineNum = 0;
154         String redisKey = String.format("LiveActivityOnlineNum:%s",activityId.toString());
155         if (redisUtil.hasKey(redisKey)){
156             onlineNum = Long.parseLong(redisUtil.get(redisKey).toString());
157         }
158         return onlineNum;
159     }
160 
161     /**
162      *
163      * 獲取點贊數
164      * @param activityId 直播id
165      * @author kim
166      * @date 2020/4/2
167      * @return java.lang.Long
168     */
169     private Long getZanNum(Integer activityId){
170         long zanNum = 0;
171         String redisKey = String.format("LiveActivityZanNum:%s",activityId.toString());
172         if (redisUtil.hasKey(redisKey)){
173             zanNum = Long.parseLong(redisUtil.get(redisKey).toString());
174         }
175         return zanNum;
176     }
177     /**
178      *
179      * redis 中寫入該直播在線人數 初始或者遞增
180      * @param activityId 直播id
181      * @author kim
182      * @date 2020/4/2
183     */
184     private void updateOnlineNum(Integer activityId){
185         String redisKey = String.format("LiveActivityOnlineNum:%s",activityId.toString());
186         if (!redisUtil.hasKey(redisKey)){
187             redisUtil.set(redisKey,1);
188         }else {
189             redisUtil.incrBy(redisKey,1L);
190         }
191     }
192     /**
193      *
194      * 遞減在線人數
195      * @param activityId 直播id
196      * @author kim
197      * @date 2020/4/2
198     */
199     private void decrOnlineNum(Integer activityId){
200         String redisKey = String.format("LiveActivityOnlineNum:%s",activityId.toString());
201         if (redisUtil.hasKey(redisKey)){
202             redisUtil.decrBy(redisKey,1L);
203         }
204     }
205 
206     /**
207      *
208      * redis 中寫入該直播點贊人數 初始或者遞增
209      * @param activityId 直播id
210      * @param num 點贊數
211      * @author kim
212      * @date 2020/4/2
213     */
214     private void updateZanNum(Integer activityId, Integer num){
215         String redisKey = String.format("LiveActivityZanNum:%s",activityId.toString());
216         System.out.println(redisUtil.hasKey(redisKey));
217         if (!redisUtil.hasKey(redisKey)){
218             redisUtil.set(redisKey,num);
219         }else {
220             redisUtil.incrBy(redisKey,num.longValue());
221         }
222     }
223 
224     /**
225      *
226      * 發送kafka 在線人數消息
227      * @param activityId 直播id
228      * @author kim
229      * @date 2020/4/2
230     */
231     private void sendOnlineNum(Integer activityId){
232         String dateStr = TimeUtils.date2Str(new Date(), "yyyy-MM-dd HH:mm:ss");
233         LiveMessageProduct liveMessageProduct5 = new LiveMessageProduct();
234         liveMessageProduct5.setActivityId(activityId);
235         liveMessageProduct5.setCreateTime(dateStr);
236         liveMessageProduct5.setUpdateTime(dateStr);
237         liveMessageProduct5.setMessageType(5);
238         liveMessageProduct5.setOnLineNumber(getOnlineNum(activityId));
239         shopLiveMessageService.liveMessageSendToKafka(liveMessageProduct5, liveMessageTopic);
240     }
241 
242     /**
243      *
244      * 點對點發消息
245      * @param message 消息內容
246      * @param session session
247      * @author kim
248      * @date 2020/4/2
249     */
250     private void sendMessageTo(String message, Session session) throws IOException {
251         session.getAsyncRemote().sendText(message);
252     }
253 
254     /**
255      *
256      * 給同一直播間用戶發送消息
257      * @param message 消息內容
258      * @param activityId 直播id
259      * @author kim
260      * @date 2020/4/2
261     */
262     public void sendMessageByKafka(String message, Integer activityId)throws IOException {
263         if (lives.containsKey(activityId)){
264             for (Session session: lives.get(activityId)){
265                 session.getBasicRemote().sendText(jsonObject.toJSONString());
266             }
267         }
268     }
269 
270 }
websocket (連接、斷開連接、發送消息 、接收消息、在線人數)
 2、直播互動消息發送kafka
 1 /**
 2      *
 3      * 直播互動消息發送kafka
 4      * @param liveMessageProduct
 5      * @author kim
 6      * @date 2020/3/27
 7      * @return void
 8     */
 9     @Override
10     public void liveMessageSendToKafka(LiveMessageProduct liveMessageProduct, String topic){
11         String liveMessageStr = JSONObject.toJSONString(liveMessageProduct);
12         log.info("直播互動消息---{}", liveMessageStr);
13         try {
14             kafkaTemplate.send(topic, liveMessageStr);
15         } catch (Exception e) {
16             log.error("直播互動消息發送kafka[{}]-失敗:{}",topic, liveMessageStr);
17         }
18     }
直播互動消息發送kafka
 3、直播互動消息數據kafka監聽處理
 1 @Slf4j
 2 @Component
 3 public class LiveMessageConsumer {
 4 
 5     @Autowired
 6     ShopLiveMessageService shopLiveMessageService;
 7 
 8     @Autowired
 9     WebSocket webSocket;
10     /**
11      * 直播互動消息數據監聽處理
12      *
13      * @param records
14      */
15     @KafkaListener(topics = "liveMessageTopic", groupId = "#{getGroupId.getLocalHost()}",containerFactory = "liveKafkaListenerContainerFactory")
16     public void listenModelEvent(List<ConsumerRecord<?, ?>> records, Acknowledgment ack){
17         //立即commit,保證最多接收一次
18         ack.acknowledge();
19         for (ConsumerRecord<?,?> record : records){
20             String key = (String) record.key();
21             String value = (String) record.value();
22             JSONObject jsonObject = JSONObject.parseObject(value);
23             System.out.println("當前消息內容:"+jsonObject.toJSONString());
24             try {
25                 Integer activityId = jsonObject.getInteger("activity_id");
26                 Integer messageType = jsonObject.getInteger("messageType");
27                 Map<String,Object> map = new HashMap<>(16);
28                 map.put("messageType",messageType);
29                 switch (messageType){
30                     case 6:
31                         // 推送websocket 點贊人數
32                         map.put("zanNumber", jsonObject.getBigInteger("zanNumber"));
33                         break;
34                     case 5:
35                         // 更新在線人數 推送websocket消息
36                         map.put("onlineNumber", jsonObject.getBigInteger("onLineNumber"));
37                         break;
38                     case 4:
39                         // 推送websocket消息
40                         map.put("textMessage",jsonObject.getString("content"));
41                         map.put("avatar", jsonObject.getString("avatar"));
42                         map.put("fromusername",jsonObject.getString("nickname"));
43                         break;
44                     case 1:
45                         // 上線 推送websocket消息
46                         map.put("username", jsonObject.getString("nickname"));
47                         break;
48                     default:
49                 }
50                 webSocket.sendMessageByKafka(JSON.toJSONString(map), activityId);
51             } catch (Exception e) {
52                 log.error(e.getMessage(),key);
53             }
54         }
55     }
56 }   
直播互動消息數據kafak監聽處理

 4、獲取服務器ip代碼

 1 @Service
 2 @Slf4j
 3 public class GetGroupId {
 4 
 5     public String getLocalHost() {
 6         List<String> ipList = new ArrayList<String>();
 7         InetAddress[] addrList;
 8         String finalIp = "";
 9         try
10         {
11             Enumeration interfaces= NetworkInterface.getNetworkInterfaces();
12             while(interfaces.hasMoreElements())
13             {
14                 NetworkInterface ni=(NetworkInterface)interfaces.nextElement();
15                 Enumeration ipAddrEnum = ni.getInetAddresses();
16                 while(ipAddrEnum.hasMoreElements())
17                 {
18                     InetAddress addr = (InetAddress)ipAddrEnum.nextElement();
19                     if (addr.isLoopbackAddress() == true)
20                     {
21                         continue;
22                     }
23 
24                     String ip = addr.getHostAddress();
25                     if (ip.indexOf(":") != -1)
26                     {
27                         //skip the IPv6 addr
28                         continue;
29                     }
30                     log.debug("Interface: " + ni.getName()
31                             + ", IP: " + ip);
32                     ipList.add(ip);
33                 }
34             }
35             Collections.sort(ipList);
36             if (ipList.size()>0){
37                 finalIp = ipList.get(0);
38             }
39         }
40         catch (Exception e)
41         {
42             e.printStackTrace();
43             log.error("Failed to get local ip list. " + e.getMessage());
44             throw new RuntimeException("Failed to get local ip list");
45         }
46         return finalIp;
47     }
48 }
獲取服務器ip

 參考資料:Spring Boot整合websocket實現群聊,點對點聊天


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM