一. 需求背景
最近新接觸一個需求,需要將kafka中的數據實時推送到前端展示。最開始想到的是前端輪詢接口數據,但是無法保證輪詢的頻率和消費的頻率完全一致,或造成數據缺失等問題。最終確定用利用WebSocket實現數據的實時推送。
二. websocket簡介
網上已經有好多介紹WebSocket的文章了,就不詳細介紹了,這里只做簡單介紹。 WebSocket協議是基於TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送信息給客戶端。
三. 服務端實現
1. pom文件
這里需要引用三個依賴。第一個為WebSocket需要的依賴,另外兩個為kafka的依賴
<dependencies>
<!-- webSocket所需依賴 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- kafka 所需依賴 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
2. webSocket服務端實現
1 //此處定義接口的uri
2 @ServerEndpoint("/wbSocket")
3 public class WebSocket {
4 private Session session;
5 public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); //此處定義靜態變量,以在其他方法中獲取到所有連接
6
7 /**
8 * 建立連接。
9 * 建立連接時入參為session
10 */
11 @OnOpen
12 public void onOpen(Session session){
13 this.session = session;
14 wbSockets.add(this); //將此對象存入集合中以在之后廣播用,如果要實現一對一訂閱,則類型對應為Map。由於這里廣播就可以了隨意用Set
15 System.out.println("New session insert,sessionId is "+ session.getId());
16 }
17 /**
18 * 關閉連接
19 */
20 @OnClose
21 public void onClose(){
22 wbSockets.remove(this);//將socket對象從集合中移除,以便廣播時不發送次連接。如果不移除會報錯(需要測試)
23 System.out.println("A session insert,sessionId is "+ session.getId());
24 }
25 /**
26 * 接收前端傳過來的數據。
27 * 雖然在實現推送邏輯中並不需要接收前端數據,但是作為一個webSocket的教程或叫備忘,還是將接收數據的邏輯加上了。
28 */
29 @OnMessage
30 public void onMessage(String message ,Session session){
31 System.out.println(message + "from " + session.getId());
32 }
33
34 public void sendMessage(String message) throws IOException {
35 this.session.getBasicRemote().sendText(message);
36 }
37 }
3. kafka消費者實現
1 public class ConsumerKafka extends Thread {
2
3 private KafkaConsumer<String,String> consumer;
4 private String topic = "kafkaTopic";
5
6 public ConsumerKafka(){
7
8 }
9
10 @Override
11 public void run(){
12 //加載kafka消費者參數
13 Properties props = new Properties();
14 props.put("bootstrap.servers", "localhost:9092");
15 props.put("group.id", "ytna");
16 props.put("enable.auto.commit", "true");
17 props.put("auto.commit.interval.ms", "1000");
18 props.put("session.timeout.ms", "15000");
19 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
20 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
21 //創建消費者對象
22 consumer = new KafkaConsumer<String,String>(props);
23 consumer.subscribe(Arrays.asList(this.topic));
24 //死循環,持續消費kafka
25 while (true){
26 try {
27 //消費數據,並設置超時時間
28 ConsumerRecords<String, String> records = consumer.poll(100);
29 //Consumer message
30 for (ConsumerRecord<String, String> record : records) {
31 //Send message to every client
32 for (WebSocket webSocket :wbSockets){
33 webSocket.sendMessage(record.value());
34 }
35 }
36 }catch (IOException e){
37 System.out.println(e.getMessage());
38 continue;
39 }
40 }
41 }
42
43 public void close() {
44 try {
45 consumer.close();
46 } catch (Exception e) {
47 System.out.println(e.getMessage());
48 }
49 }
50
51 //供測試用,若通過tomcat啟動需通過其他方法啟動線程
52 public static void main(String[] args){
53 ConsumerKafka consumerKafka = new ConsumerKafka();
54 consumerKafka.start();
55 }
56 }
P.S. 需要注意的是WebSocket對tomcat版本是有要求的,筆者使用的是7.0.7.8。
四. 前端簡單實現
1 <!DOCTYPE html>
2 <html lang="en">
3 <head>
4 <meta charset="UTF-8">
5 <title>WebSocket client</title>
6 <script type="text/javascript">
7 var socket;
8 if (typeof (WebSocket) == "undefined"){
9 alert("This explorer don't support WebSocket")
10 }
11
12 function connect() {
13 //Connect WebSocket server
14 socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
15 //open
16 socket.onopen = function () {
17 alert("WebSocket is open");
18 }
19 //Get message
20 socket.onmessage = function (msg) {
21 alert("Message is " + msg);
22 }
23 //close
24 socket.onclose = function () {
25 alert("WebSocket is closed");
26 }
27 //error
28 socket.onerror = function (e) {
29 alert("Error is " + e);
30 }
31 }
32
33 function close() {
34 socket.close();
35 }
36
37 function sendMsg() {
38 socket.send("This is a client message ");
39 }
40 </script>
41 </head>
42 <body>
43 <button onclick="connect()">connect</button>
44 <button onclick="close()">close</button>
45 <button onclick="sendMsg()">sendMsg</button>
46 </body>
47 </html>
五. 結語
以上基本可以實現將kafka數據實時推送到前端。這是筆者第一篇筆記,不足之處請指出、諒解。
源碼:https://github.com/youtNa/webSocketkafka
引用:1.
webSocket百度百科

