WebSocket和kafka實現數據實時推送到前端


一. 需求背景
     最近新接觸一個需求,需要將kafka中的數據實時推送到前端展示。最開始想到的是前端輪詢接口數據,但是無法保證輪詢的頻率和消費的頻率完全一致,或造成數據缺失等問題。最終確定用利用WebSocket實現數據的實時推送。
 
二. websocket簡介
     網上已經有好多介紹WebSocket的文章了,就不詳細介紹了,這里只做簡單介紹。 WebSocket協議是基於TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送信息給客戶端。
 
三. 服務端實現
  1. pom文件
  這里需要引用三個依賴。第一個為WebSocket需要的依賴,另外兩個為kafka的依賴
 

<dependencies>
<!-- webSocket所需依賴 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- kafka 所需依賴 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>

2. webSocket服務端實現

 1 //此處定義接口的uri
 2 @ServerEndpoint("/wbSocket")
 3 public class WebSocket {
 4     private Session session;
 5     public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); //此處定義靜態變量,以在其他方法中獲取到所有連接
 6     
 7     /**
 8      * 建立連接。
 9      * 建立連接時入參為session
10      */
11     @OnOpen
12     public void onOpen(Session session){
13         this.session = session;
14         wbSockets.add(this); //將此對象存入集合中以在之后廣播用,如果要實現一對一訂閱,則類型對應為Map。由於這里廣播就可以了隨意用Set
15         System.out.println("New session insert,sessionId is "+ session.getId());
16     }
17     /**
18      * 關閉連接
19      */
20     @OnClose
21     public void onClose(){
22         wbSockets.remove(this);//將socket對象從集合中移除,以便廣播時不發送次連接。如果不移除會報錯(需要測試)
23         System.out.println("A session insert,sessionId is "+ session.getId());
24     }
25     /**
26      * 接收前端傳過來的數據。
27      * 雖然在實現推送邏輯中並不需要接收前端數據,但是作為一個webSocket的教程或叫備忘,還是將接收數據的邏輯加上了。
28      */
29     @OnMessage
30     public void onMessage(String message ,Session session){
31         System.out.println(message + "from " + session.getId());
32     }
33 
34     public void sendMessage(String message) throws IOException {
35         this.session.getBasicRemote().sendText(message);
36     }
37 }

 

3. kafka消費者實現

復制代碼
 1 public class ConsumerKafka extends Thread {
 2 
 3     private KafkaConsumer<String,String> consumer;
 4     private String topic = "kafkaTopic";
 5 
 6     public ConsumerKafka(){
 7 
 8     }
 9 
10     @Override
11     public void run(){
12         //加載kafka消費者參數
13         Properties props = new Properties();
14         props.put("bootstrap.servers", "localhost:9092");
15         props.put("group.id", "ytna");
16         props.put("enable.auto.commit", "true");
17         props.put("auto.commit.interval.ms", "1000");
18         props.put("session.timeout.ms", "15000");
19         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
20         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
21         //創建消費者對象
22         consumer = new KafkaConsumer<String,String>(props);
23         consumer.subscribe(Arrays.asList(this.topic));
24         //死循環,持續消費kafka
25         while (true){
26             try {
27                //消費數據,並設置超時時間
28                 ConsumerRecords<String, String> records = consumer.poll(100);
29                 //Consumer message
30                 for (ConsumerRecord<String, String> record : records) {
31                     //Send message to every client
32                     for (WebSocket webSocket :wbSockets){
33                         webSocket.sendMessage(record.value());
34                     }
35                 }
36             }catch (IOException e){
37                 System.out.println(e.getMessage());
38                 continue;
39             }
40         }
41     }
42 
43     public void close() {
44         try {
45             consumer.close();
46         } catch (Exception e) {
47             System.out.println(e.getMessage());
48         }
49     }
50 
51     //供測試用,若通過tomcat啟動需通過其他方法啟動線程
52     public static void main(String[] args){
53         ConsumerKafka consumerKafka = new ConsumerKafka();
54         consumerKafka.start();
55     }
56 }
復制代碼
 
P.S. 需要注意的是WebSocket對tomcat版本是有要求的,筆者使用的是7.0.7.8。
 
四. 前端簡單實現
復制代碼
 1 <!DOCTYPE html>
 2 <html lang="en">
 3 <head>
 4     <meta charset="UTF-8">
 5     <title>WebSocket client</title>
 6     <script type="text/javascript">
 7         var socket;
 8         if (typeof (WebSocket) == "undefined"){
 9             alert("This explorer don't support WebSocket")
10         }
11 
12         function connect() {
13             //Connect WebSocket server
14             socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
15             //open
16             socket.onopen = function () {
17                 alert("WebSocket is open");
18             }
19             //Get message
20             socket.onmessage = function (msg) {
21                 alert("Message is " + msg);
22             }
23             //close
24             socket.onclose = function () {
25                 alert("WebSocket is closed");
26             }
27             //error
28             socket.onerror = function (e) {
29                 alert("Error is " + e);
30             }
31         }
32 
33         function close() {
34             socket.close();
35         }
36 
37         function sendMsg() {
38             socket.send("This is a client message ");
39         }
40     </script>
41 </head>
42 <body>
43     <button onclick="connect()">connect</button>
44     <button onclick="close()">close</button>
45     <button onclick="sendMsg()">sendMsg</button>
46 </body>
47 </html>
復制代碼
 
五. 結語
     以上基本可以實現將kafka數據實時推送到前端。這是筆者第一篇筆記,不足之處請指出、諒解。
     源碼:https://github.com/youtNa/webSocketkafka
   引用:1.  webSocket百度百科

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM