Spring Boot 下 Kafka和Websocket通信



{
"Spring Boot 版本":"2.5.2",
""
}

0.(pom.xml)配置文件添加依賴項



<!-- 我是沒有引用這個{javaee-api},好像spring-boot已經包含了類似這樣的基礎類庫... -->
<dependency>
	<groupId>javax</groupId>
	<artifactId>javaee-api</artifactId>
	<version>7.0</version>
	<scope>provided</scope>
</dependency>


<!-- 我的這個案例中,這個是必選依賴項(基於其它的插件也是可以實現,不在本文討論范圍內),springboot的高級組件會自動引用基礎的組件,像spring-boot-starter-websocket就引入了spring-boot-starter-web和spring-boot-starter -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
	<version>1.3.5.RELEASE</version>
</dependency>


依賴項這里我搞了一上午才搞定,各種問題,各種依賴關系沖突,
比如:明明在pom.xml中添加了對應的包{
spring-boot-starter-websocket},並且刷新了Maven,
結果在程序中就是訪問不到對應的類{ServerEndpointExporter}

1.注入ServerEndpointExporter到容器中,為了后續使用注解{@ServerEndpoint}做准備

本案例中首先要注入ServerEndpointExporter,這個bean會自動注冊使用了@ServerEndpoint注解聲明的Websocket endpoint.
注意,如果使用獨立的servlet容器,而不是直接使用springboot的內置容器,就不要注入ServerEndpointExporter,因為它將由容器自己提供和管理

//文件 - "WebSocketConfigOne.java"
@Configuration
public class WebSocketConfigOne {
    /*
     * 這個bean會自動注冊使用了@ServerEndpoint注解聲明的對象
     * 沒有的話會報404
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

2.WebSocket具體實現類

前端需要建立連接的webSocket實現

  • 連接.建立
  • 連接.關閉
  • 服務端向客戶端.發送消息
  • 客戶端向服務端.發送消息

//文件 - "MyWebSocket.java"
@Component
//此處定義接口的uri
@ServerEndpoint("/wbSocket")
public class MyWebSocket {
    private Session session;
    public static CopyOnWriteArraySet<MyWebSocket> myWebSockets = new CopyOnWriteArraySet<MyWebSocket>(); //此處定義靜態變量,以在其他方法中獲取到所有連接

    /**
     * 建立連接。
     * 建立連接時入參為session
     */
    @OnOpen
    public void onOpen(Session session){
        this.session = session;
        myWebSockets.add(this); //將此對象存入集合中以在之后廣播用,如果要實現一對一訂閱,則類型對應為Map。由於這里廣播就可以了隨意用Set
        System.out.println("New session insert,sessionId is "+ session.getId());
    }
    /**
     * 關閉連接
     */
    @OnClose
    public void onClose(){
        myWebSockets.remove(this);//將socket對象從集合中移除,以便廣播時不發送次連接。如果不移除會報錯(需要測試)
        System.out.println("A session insert,sessionId is "+ session.getId());
    }
    /**
     * 接收前端傳過來的數據。
     * 雖然在實現推送邏輯中並不需要接收前端數據,但是作為一個webSocket的教程或叫備忘,還是將接收數據的邏輯加上了。
     */
    @OnMessage
    public void onMessage(String message ,Session session){
        System.out.println(message + "from " + session.getId());
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
}

3.Kafka消費者實現類

本案例中,Kafka消費的消息會直接通過和前端WebSocket的連接顯示到前端


//文件 - "ConsumerKafka.java"
/*
Kafka
 */
public class ConsumerKafka extends Thread {

    private KafkaConsumer<String,String> consumer;
    //Kafka.主題
    private String topic = "topid_kafka";

    public ConsumerKafka(){
    }

    /*
    啟動
     */
    @Override
    public void run(){
        //加載kafka消費者參數
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "groupId_kafka");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "15000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //創建消費者對象
        consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList(this.topic));
        //死循環,持續消費kafka
        while (true){
            try {
                //消費數據,並設置超時時間
                ConsumerRecords<String, String> records = consumer.poll(100);
                //Consumer message
                for (ConsumerRecord<String, String> record : records) {
                    //注意,變量{myWebSockets}存在於類文件{MyWebSocket.cs}中
                    for (MyWebSocket myWebSocket :myWebSockets){
                        myWebSocket.sendMessage(record.value());
                    }
                }
            }catch (IOException e){
                System.out.println(e.getMessage());
                continue;
            }
        }
    }

    /*
    關閉
     */
    public void close() {
        try {
            consumer.close();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

4.main函數修改

將Kafka消費者實例化


//文件 - "Application.java"
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        //將Kafka消費者實例化,運行起來....
        ConsumerKafka consumerKafka = new ConsumerKafka();
        consumerKafka.start();
        SpringApplication.run(Application.class, args);
    }
}

5.前端WebSocket代碼


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket client</title>
    <script type="text/javascript">
        var socket;
        if (typeof (WebSocket) == "undefined"){
			setMessageInnerHTML("This explorer don't support WebSocket");
        }

        function connect() {
            //Connect WebSocket server
            socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
			
            //open
            socket.onopen = function () {
                setMessageInnerHTML("open");
            }
            //Get message
            socket.onmessage = function (msg) {
				var data = JSON.parse(msg.data)
				setMessageInnerHTML("服務端消息:" + data);
            }
            //close
            socket.onclose = function () {				
				setMessageInnerHTML("連接已關閉...");
            }
            //error
            socket.onerror = function (e) {
				setMessageInnerHTML("錯誤:"+JSON.stringify(e));
            }
        }

        function close() {
			setMessageInnerHTML("關閉連接...");
            socket.close();
        }

        function sendMsg() {
			var clientMsg = document.getElementById('id_clientMsg').value;
            socket.send("This is a client message :" + clientMsg);
        }
		
		//將消息顯示在網頁上
		function setMessageInnerHTML(innerHTML){
			document.getElementById('id_msgRecord').innerHTML += innerHTML + '<br/>';
		}
    </script>
</head>
<body>
	<!-- 連接 -->
    <button onclick="connect()">connect</button>
	<!-- 關閉連接 -->
    <button onclick="close()">close</button>
	<!-- 客戶端消息發送 -->
    <button onclick="sendMsg()">sendMsg</button>
	
	<!-- 客戶端消息 -->
	<input id="id_clientMsg" type="text" />
	
	<!-- 消息記錄 -->
	<div id="id_msgRecord">
	</div>
</body>
</html>

總結

坑集合

  • [x] 坑1 -- pom.xml中各種依賴的關系,搞得有些依賴明明已經寫入了pom.xml中,但是依然使用不了 ...
  • [X] 坑2 -- 剛開始WebSocket一度連接不上,后來添加上文件{WebSocketConfigOne.java}就可以了
  • [X] 坑3 -- Kafka消費者剛開始一直忘了實例化...

其它

擴展

  • WebSocket和Socket什么關系?
  • HTTP和WebSocket什么關系?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM