2020年初 疫情的突然襲來、讓人們都宅在家里,越來越多的公司、平台上線了直播電商的業務。筆者的公司也打算做小程序直播訂單的業務。直播互動、點贊功能、統計用戶在線時長的頻率(1/1min)的心跳消息打算用websocket 來實現。
項目中使用的spring結合websocket配置。直播的互動消息和聊天室的功能很像。網上也有很多demo 可以參考。
在服務器集群的環境下,會涉及到WebSocket Session共享的問題。針對這個問題想到一個比較簡單的解決辦法是。
1、關於在線人數用redis集群去維護在線人數 ,可以根據活動id (或者聊天室id)等構件key-value 唯一鍵值對,通過人數遞增、遞減去維護。
2、關於發送消息,可能會存在同一直播間的用戶被nginx 負載均衡到不同的機器上,也就會出現 一個直播活動(聊天室) 同時被創建並存在 A 服務器 和 B 服務器上 所以就要想辦法讓處於兩個服務器上的用戶像在一台上一樣,可以交流。這里發彈幕(發消息)、上線通知、點贊、用戶心跳信息等消息 都是用kafka 去處理。消息的類型可以用一個type字段進行區分, producer 把消息發送到 一個topic中。 然后kafka consumer 去接收這個topic 的消息去處理,這里面接收的時候 consumer 需要設置不同的group_id(可以設置成服務器ip)確保都接收到消息。接收到消息在判斷是否存在 該活動直播(聊天室),如果有則處理推送消息等相關操作。下面是部分功能代碼
1、websocket (連接、斷開連接、發送消息 、接收消息、在線人數)

1 /** 2 * 3 * webSocket消息 4 * @author kim 5 * @date 2020/3/12 6 */ 7 @Component 8 @Slf4j 9 @ServerEndpoint("/websocket/{activityId}/{username}") 10 public class WebSocket { 11 12 public static RedisUtil redisUtil; 13 14 private static String liveMessageTopic = "liveMessageTopic"; 15 16 public static ShopLiveMessageService shopLiveMessageService; 17 18 /** 19 * 使用map來收集session,key為直播活動id,value為同一個活動的用戶集合 20 */ 21 private static Map<Integer, Set<Session>> lives = new ConcurrentHashMap<>(); 22 23 @Autowired 24 private DocSensitiveService docSensitiveService; 25 26 /** 27 * 建立連接 28 * 29 * @param session 30 */ 31 @OnOpen 32 public void onOpen(@PathParam("activityId")Integer activityId, @PathParam("username") String username, Session session) 33 { 34 //onlineNumber++; 35 if (!lives.containsKey(activityId)){ 36 //創建一個新的活動房間 37 Set<Session> room = new HashSet<Session>(); 38 room.add(session); 39 lives.put(activityId, room); 40 // 初始該活動點贊數 41 updateZanNum(activityId, 0); 42 }else { 43 lives.get(activityId).add(session); 44 } 45 // 該活動在線人數+1 46 updateOnlineNum(activityId); 47 log.info("現在來直播活動"+activityId+"連接的客戶id:"+session.getId()+"用戶名:"+username); 48 log.info("有新連接加入! 當前在線人數" + getOnlineNum(activityId)); 49 try { 50 //messageType 1代表上線 2代表下線 3代表在線名單 4代表普通消息 5 在線人數 51 //先給所有人發送通知,說我上線了 52 String dateStr = TimeUtils.date2Str(new Date(), "yyyy-MM-dd HH:mm:ss"); 53 LiveMessageProduct liveMessageProduct = new LiveMessageProduct(); 54 liveMessageProduct.setNickname(username); 55 liveMessageProduct.setActivityId(activityId); 56 liveMessageProduct.setCreateTime(dateStr); 57 liveMessageProduct.setUpdateTime(dateStr); 58 liveMessageProduct.setMessageType(1); 59 shopLiveMessageService.liveMessageSendToKafka(liveMessageProduct, liveMessageTopic); 60 //給所有人發一條消息:更新在線人數 61 sendOnlineNum(activityId); 62 // 給自己發:目前活動點贊數量 63 Map<String,Object> map3 = new HashMap<>(); 64 map3.put("messageType",6); 65 map3.put("zanNumber",getZanNum(activityId)); 66 sendMessageTo(JSON.toJSONString(map3), session); 67 } 68 catch (IOException e){ 69 log.error(username+"上線的時候通知所有人發生了錯誤"); 70 } 71 } 72 73 @OnError 74 public void onError(Session session, Throwable error) { 75 log.error("服務端發生了錯誤"+error); 76 //error.printStackTrace(); 77 } 78 /** 79 * 連接關閉 80 */ 81 @OnClose 82 public void onClose(@PathParam("activityId")Integer activityId, @PathParam("username") String username, Session session) 83 { 84 // 更新在線人數 85 decrOnlineNum(activityId); 86 lives.get(activityId).remove(session); 87 try { 88 //messageType 1代表上線 2代表下線 3代表在線名單 4代表普通消息 5 在線人數 89 //發送kafka 在線人數 90 sendOnlineNum(activityId); 91 } 92 catch (Exception e){ 93 log.error(username+"下線的時候通知所有人發生了錯誤"); 94 } 95 log.info("有連接關閉! 當前在線人數" + getOnlineNum(activityId)); 96 } 97 98 /** 99 * 收到客戶端的消息 100 * 101 * @param message 消息 102 * @param session 會話 103 */ 104 @OnMessage 105 public void onMessage(@PathParam("activityId")Integer activityId, @PathParam("username") String username, String message, Session session) 106 { 107 try { 108 log.info("來自客戶端消息:" + message+"客戶端的id是:"+session.getId()); 109 JSONObject jsonObject = JSON.parseObject(message); 110 Integer companyId = Integer.parseInt(jsonObject.getString("company_id")); 111 String openid = jsonObject.getString("openid"); 112 String unionid = jsonObject.getString("unionid"); 113 String avatar = jsonObject.getString("avatar"); 114 Integer messageType = jsonObject.getInteger("messageType"); 115 //messageType 1代表上線 2代表下線 3代表在線名單 4代表普通消息 5 在線人數 6點贊 7 心跳包 116 LiveMessageProduct liveMessageProduct = new LiveMessageProduct(); 117 if (messageType.equals(4)){ 118 String textMessage = jsonObject.getString("message"); 119 liveMessageProduct.setContent(textMessage); 120 }else if (messageType.equals(6)){ 121 updateZanNum(activityId, 1); 122 liveMessageProduct.setZanNumber(getZanNum(activityId)); 123 } 124 // 消息同步kafka 125 String dateStr = TimeUtils.date2Str(new Date(), "yyyy-MM-dd HH:mm:ss"); 126 liveMessageProduct.setNickname(username); 127 liveMessageProduct.setActivityId(activityId); 128 liveMessageProduct.setCompanyId(companyId); 129 liveMessageProduct.setOpenid(openid); 130 liveMessageProduct.setUnionid(unionid); 131 liveMessageProduct.setAvatar(avatar); 132 liveMessageProduct.setCreateTime(dateStr); 133 liveMessageProduct.setUpdateTime(dateStr); 134 liveMessageProduct.setMessageType(messageType); 135 shopLiveMessageService.liveMessageSendToKafka(liveMessageProduct, liveMessageTopic); 136 shopLiveMessageService.liveMessageSendToKafka(liveMessageProduct, "saveLiveMessageTopic"); 137 } 138 catch (Exception e){ 139 log.error("發生了錯誤了-{}", e); 140 } 141 142 } 143 144 /** 145 * 146 * 獲取在線人數 147 * @param activityId 直播id 148 * @author kim 149 * @date 2020/4/2 150 * @return java.lang.Long 151 */ 152 private Long getOnlineNum(Integer activityId){ 153 long onlineNum = 0; 154 String redisKey = String.format("LiveActivityOnlineNum:%s",activityId.toString()); 155 if (redisUtil.hasKey(redisKey)){ 156 onlineNum = Long.parseLong(redisUtil.get(redisKey).toString()); 157 } 158 return onlineNum; 159 } 160 161 /** 162 * 163 * 獲取點贊數 164 * @param activityId 直播id 165 * @author kim 166 * @date 2020/4/2 167 * @return java.lang.Long 168 */ 169 private Long getZanNum(Integer activityId){ 170 long zanNum = 0; 171 String redisKey = String.format("LiveActivityZanNum:%s",activityId.toString()); 172 if (redisUtil.hasKey(redisKey)){ 173 zanNum = Long.parseLong(redisUtil.get(redisKey).toString()); 174 } 175 return zanNum; 176 } 177 /** 178 * 179 * redis 中寫入該直播在線人數 初始或者遞增 180 * @param activityId 直播id 181 * @author kim 182 * @date 2020/4/2 183 */ 184 private void updateOnlineNum(Integer activityId){ 185 String redisKey = String.format("LiveActivityOnlineNum:%s",activityId.toString()); 186 if (!redisUtil.hasKey(redisKey)){ 187 redisUtil.set(redisKey,1); 188 }else { 189 redisUtil.incrBy(redisKey,1L); 190 } 191 } 192 /** 193 * 194 * 遞減在線人數 195 * @param activityId 直播id 196 * @author kim 197 * @date 2020/4/2 198 */ 199 private void decrOnlineNum(Integer activityId){ 200 String redisKey = String.format("LiveActivityOnlineNum:%s",activityId.toString()); 201 if (redisUtil.hasKey(redisKey)){ 202 redisUtil.decrBy(redisKey,1L); 203 } 204 } 205 206 /** 207 * 208 * redis 中寫入該直播點贊人數 初始或者遞增 209 * @param activityId 直播id 210 * @param num 點贊數 211 * @author kim 212 * @date 2020/4/2 213 */ 214 private void updateZanNum(Integer activityId, Integer num){ 215 String redisKey = String.format("LiveActivityZanNum:%s",activityId.toString()); 216 System.out.println(redisUtil.hasKey(redisKey)); 217 if (!redisUtil.hasKey(redisKey)){ 218 redisUtil.set(redisKey,num); 219 }else { 220 redisUtil.incrBy(redisKey,num.longValue()); 221 } 222 } 223 224 /** 225 * 226 * 發送kafka 在線人數消息 227 * @param activityId 直播id 228 * @author kim 229 * @date 2020/4/2 230 */ 231 private void sendOnlineNum(Integer activityId){ 232 String dateStr = TimeUtils.date2Str(new Date(), "yyyy-MM-dd HH:mm:ss"); 233 LiveMessageProduct liveMessageProduct5 = new LiveMessageProduct(); 234 liveMessageProduct5.setActivityId(activityId); 235 liveMessageProduct5.setCreateTime(dateStr); 236 liveMessageProduct5.setUpdateTime(dateStr); 237 liveMessageProduct5.setMessageType(5); 238 liveMessageProduct5.setOnLineNumber(getOnlineNum(activityId)); 239 shopLiveMessageService.liveMessageSendToKafka(liveMessageProduct5, liveMessageTopic); 240 } 241 242 /** 243 * 244 * 點對點發消息 245 * @param message 消息內容 246 * @param session session 247 * @author kim 248 * @date 2020/4/2 249 */ 250 private void sendMessageTo(String message, Session session) throws IOException { 251 session.getAsyncRemote().sendText(message); 252 } 253 254 /** 255 * 256 * 給同一直播間用戶發送消息 257 * @param message 消息內容 258 * @param activityId 直播id 259 * @author kim 260 * @date 2020/4/2 261 */ 262 public void sendMessageByKafka(String message, Integer activityId)throws IOException { 263 if (lives.containsKey(activityId)){ 264 for (Session session: lives.get(activityId)){ 265 session.getBasicRemote().sendText(jsonObject.toJSONString()); 266 } 267 } 268 } 269 270 }
2、直播互動消息發送kafka

1 /** 2 * 3 * 直播互動消息發送kafka 4 * @param liveMessageProduct 5 * @author kim 6 * @date 2020/3/27 7 * @return void 8 */ 9 @Override 10 public void liveMessageSendToKafka(LiveMessageProduct liveMessageProduct, String topic){ 11 String liveMessageStr = JSONObject.toJSONString(liveMessageProduct); 12 log.info("直播互動消息---{}", liveMessageStr); 13 try { 14 kafkaTemplate.send(topic, liveMessageStr); 15 } catch (Exception e) { 16 log.error("直播互動消息發送kafka[{}]-失敗:{}",topic, liveMessageStr); 17 } 18 }
3、直播互動消息數據kafka監聽處理

1 @Slf4j 2 @Component 3 public class LiveMessageConsumer { 4 5 @Autowired 6 ShopLiveMessageService shopLiveMessageService; 7 8 @Autowired 9 WebSocket webSocket; 10 /** 11 * 直播互動消息數據監聽處理 12 * 13 * @param records 14 */ 15 @KafkaListener(topics = "liveMessageTopic", groupId = "#{getGroupId.getLocalHost()}",containerFactory = "liveKafkaListenerContainerFactory") 16 public void listenModelEvent(List<ConsumerRecord<?, ?>> records, Acknowledgment ack){ 17 //立即commit,保證最多接收一次 18 ack.acknowledge(); 19 for (ConsumerRecord<?,?> record : records){ 20 String key = (String) record.key(); 21 String value = (String) record.value(); 22 JSONObject jsonObject = JSONObject.parseObject(value); 23 System.out.println("當前消息內容:"+jsonObject.toJSONString()); 24 try { 25 Integer activityId = jsonObject.getInteger("activity_id"); 26 Integer messageType = jsonObject.getInteger("messageType"); 27 Map<String,Object> map = new HashMap<>(16); 28 map.put("messageType",messageType); 29 switch (messageType){ 30 case 6: 31 // 推送websocket 點贊人數 32 map.put("zanNumber", jsonObject.getBigInteger("zanNumber")); 33 break; 34 case 5: 35 // 更新在線人數 推送websocket消息 36 map.put("onlineNumber", jsonObject.getBigInteger("onLineNumber")); 37 break; 38 case 4: 39 // 推送websocket消息 40 map.put("textMessage",jsonObject.getString("content")); 41 map.put("avatar", jsonObject.getString("avatar")); 42 map.put("fromusername",jsonObject.getString("nickname")); 43 break; 44 case 1: 45 // 上線 推送websocket消息 46 map.put("username", jsonObject.getString("nickname")); 47 break; 48 default: 49 } 50 webSocket.sendMessageByKafka(JSON.toJSONString(map), activityId); 51 } catch (Exception e) { 52 log.error(e.getMessage(),key); 53 } 54 } 55 } 56 }
4、獲取服務器ip代碼

1 @Service 2 @Slf4j 3 public class GetGroupId { 4 5 public String getLocalHost() { 6 List<String> ipList = new ArrayList<String>(); 7 InetAddress[] addrList; 8 String finalIp = ""; 9 try 10 { 11 Enumeration interfaces= NetworkInterface.getNetworkInterfaces(); 12 while(interfaces.hasMoreElements()) 13 { 14 NetworkInterface ni=(NetworkInterface)interfaces.nextElement(); 15 Enumeration ipAddrEnum = ni.getInetAddresses(); 16 while(ipAddrEnum.hasMoreElements()) 17 { 18 InetAddress addr = (InetAddress)ipAddrEnum.nextElement(); 19 if (addr.isLoopbackAddress() == true) 20 { 21 continue; 22 } 23 24 String ip = addr.getHostAddress(); 25 if (ip.indexOf(":") != -1) 26 { 27 //skip the IPv6 addr 28 continue; 29 } 30 log.debug("Interface: " + ni.getName() 31 + ", IP: " + ip); 32 ipList.add(ip); 33 } 34 } 35 Collections.sort(ipList); 36 if (ipList.size()>0){ 37 finalIp = ipList.get(0); 38 } 39 } 40 catch (Exception e) 41 { 42 e.printStackTrace(); 43 log.error("Failed to get local ip list. " + e.getMessage()); 44 throw new RuntimeException("Failed to get local ip list"); 45 } 46 return finalIp; 47 } 48 }