{
"Spring Boot 版本":"2.5.2",
""
}
0.(pom.xml)配置文件添加依賴項
<!-- 我是沒有引用這個{javaee-api},好像spring-boot已經包含了類似這樣的基礎類庫... -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
<scope>provided</scope>
</dependency>
<!-- 我的這個案例中,這個是必選依賴項(基於其它的插件也是可以實現,不在本文討論范圍內),springboot的高級組件會自動引用基礎的組件,像spring-boot-starter-websocket就引入了spring-boot-starter-web和spring-boot-starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
依賴項這里我搞了一上午才搞定,各種問題,各種依賴關系沖突,
比如:明明在pom.xml中添加了對應的包{
spring-boot-starter-websocket},並且刷新了Maven,
結果在程序中就是訪問不到對應的類{ServerEndpointExporter}
1.注入ServerEndpointExporter到容器中,為了后續使用注解{@ServerEndpoint}做准備
本案例中首先要注入ServerEndpointExporter,這個bean會自動注冊使用了@ServerEndpoint注解聲明的Websocket endpoint.
注意,如果使用獨立的servlet容器,而不是直接使用springboot的內置容器,就不要注入ServerEndpointExporter,因為它將由容器自己提供和管理
//文件 - "WebSocketConfigOne.java"
@Configuration
public class WebSocketConfigOne {
/*
* 這個bean會自動注冊使用了@ServerEndpoint注解聲明的對象
* 沒有的話會報404
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
2.WebSocket具體實現類
前端需要建立連接的webSocket實現
- 連接.建立
- 連接.關閉
- 服務端向客戶端.發送消息
- 客戶端向服務端.發送消息
//文件 - "MyWebSocket.java"
@Component
//此處定義接口的uri
@ServerEndpoint("/wbSocket")
public class MyWebSocket {
private Session session;
public static CopyOnWriteArraySet<MyWebSocket> myWebSockets = new CopyOnWriteArraySet<MyWebSocket>(); //此處定義靜態變量,以在其他方法中獲取到所有連接
/**
* 建立連接。
* 建立連接時入參為session
*/
@OnOpen
public void onOpen(Session session){
this.session = session;
myWebSockets.add(this); //將此對象存入集合中以在之后廣播用,如果要實現一對一訂閱,則類型對應為Map。由於這里廣播就可以了隨意用Set
System.out.println("New session insert,sessionId is "+ session.getId());
}
/**
* 關閉連接
*/
@OnClose
public void onClose(){
myWebSockets.remove(this);//將socket對象從集合中移除,以便廣播時不發送次連接。如果不移除會報錯(需要測試)
System.out.println("A session insert,sessionId is "+ session.getId());
}
/**
* 接收前端傳過來的數據。
* 雖然在實現推送邏輯中並不需要接收前端數據,但是作為一個webSocket的教程或叫備忘,還是將接收數據的邏輯加上了。
*/
@OnMessage
public void onMessage(String message ,Session session){
System.out.println(message + "from " + session.getId());
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
}
3.Kafka消費者實現類
本案例中,Kafka消費的消息會直接通過和前端WebSocket的連接顯示到前端
//文件 - "ConsumerKafka.java"
/*
Kafka
*/
public class ConsumerKafka extends Thread {
private KafkaConsumer<String,String> consumer;
//Kafka.主題
private String topic = "topid_kafka";
public ConsumerKafka(){
}
/*
啟動
*/
@Override
public void run(){
//加載kafka消費者參數
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "groupId_kafka");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "15000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//創建消費者對象
consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList(this.topic));
//死循環,持續消費kafka
while (true){
try {
//消費數據,並設置超時時間
ConsumerRecords<String, String> records = consumer.poll(100);
//Consumer message
for (ConsumerRecord<String, String> record : records) {
//注意,變量{myWebSockets}存在於類文件{MyWebSocket.cs}中
for (MyWebSocket myWebSocket :myWebSockets){
myWebSocket.sendMessage(record.value());
}
}
}catch (IOException e){
System.out.println(e.getMessage());
continue;
}
}
}
/*
關閉
*/
public void close() {
try {
consumer.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
4.main函數修改
將Kafka消費者實例化
//文件 - "Application.java"
@SpringBootApplication
public class Application {
public static void main(String[] args) {
//將Kafka消費者實例化,運行起來....
ConsumerKafka consumerKafka = new ConsumerKafka();
consumerKafka.start();
SpringApplication.run(Application.class, args);
}
}
5.前端WebSocket代碼
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket client</title>
<script type="text/javascript">
var socket;
if (typeof (WebSocket) == "undefined"){
setMessageInnerHTML("This explorer don't support WebSocket");
}
function connect() {
//Connect WebSocket server
socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
//open
socket.onopen = function () {
setMessageInnerHTML("open");
}
//Get message
socket.onmessage = function (msg) {
var data = JSON.parse(msg.data)
setMessageInnerHTML("服務端消息:" + data);
}
//close
socket.onclose = function () {
setMessageInnerHTML("連接已關閉...");
}
//error
socket.onerror = function (e) {
setMessageInnerHTML("錯誤:"+JSON.stringify(e));
}
}
function close() {
setMessageInnerHTML("關閉連接...");
socket.close();
}
function sendMsg() {
var clientMsg = document.getElementById('id_clientMsg').value;
socket.send("This is a client message :" + clientMsg);
}
//將消息顯示在網頁上
function setMessageInnerHTML(innerHTML){
document.getElementById('id_msgRecord').innerHTML += innerHTML + '<br/>';
}
</script>
</head>
<body>
<!-- 連接 -->
<button onclick="connect()">connect</button>
<!-- 關閉連接 -->
<button onclick="close()">close</button>
<!-- 客戶端消息發送 -->
<button onclick="sendMsg()">sendMsg</button>
<!-- 客戶端消息 -->
<input id="id_clientMsg" type="text" />
<!-- 消息記錄 -->
<div id="id_msgRecord">
</div>
</body>
</html>
總結
坑集合
- [x] 坑1 -- pom.xml中各種依賴的關系,搞得有些依賴明明已經寫入了pom.xml中,但是依然使用不了 ...
- [X] 坑2 -- 剛開始WebSocket一度連接不上,后來添加上文件{WebSocketConfigOne.java}就可以了
- [X] 坑3 -- Kafka消費者剛開始一直忘了實例化...
其它
擴展
- WebSocket和Socket什么關系?
- HTTP和WebSocket什么關系?