Springboot+WebSocket+Kafka(寫着玩的)


鬧着玩的來源:前台發送消息,后台接受處理發給kafka,kafka消費者接到消息傳給前台顯示。聯想到websocket。

最終效果如圖:

頁面解釋:

不填寫內容的話,表單值默認為Topic、Greeting、Name

點擊訂閱,按鈕變黑

Send Topic 廣播 前台顯示前綴:T-You Send
Subscribe Topic 訂閱廣播 前台顯示前綴:A-You Receive、B-You Receive
Send Queue 廣播 前台顯示前綴:Q-You Send
Subscribe Queue 訂閱廣播 前台顯示前綴:C-You Receive、D-You Receive
Subscribe Point 訂閱點對點 前台顯示前綴:/user/110/queue/pushInfo,Receive
Send Kafka 點對點 前台顯示前綴:Kafka Send
Receive Kafka 訂閱點對點 前台顯示前綴:Kafka Receive

 

 

 

 

 

 

 

重要提示:欲接受消息,先點擊訂閱

關鍵代碼:

配置websocket

package com.example.demo.conf;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @program: boot-kafka
 * @description:
 * @author: 001977
 * @create: 2018-07-11 11:30
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfigurationWithSTOMP implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/clientConnectThis").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // P2P should conf a /user  ;   broadcast should conf a /topic
        config.enableSimpleBroker("/topic", "/queue", "/user");
        config.setApplicationDestinationPrefixes("/app");   // Client to Server
        config.setUserDestinationPrefix("/user");           // Server to Client
    }
}

 

 

controller

package com.example.demo.controller;

import com.example.demo.entity.Welcome;
import com.example.demo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;

/**
 * @program: boot-kafka
 * @description:
 * @author: 001977
 * @create: 2018-07-11 11:00
 */
@RestController
public class SimpleController {

    @Autowired
    private KafkaService kafkaService;

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @RequestMapping("/")
    public ModelAndView stomp(){
        return new ModelAndView("stomp_home");
    }

    @SubscribeMapping("/firstConnection")
    public Welcome thanks(){
        return new Welcome("...", "Thank You!");
    }

    @MessageMapping("/sendMessageTopic")
    @SendTo("/topic/webSocketTopic")
    public Welcome sendToTopic(@RequestBody Welcome welcome){
        System.out.println("Send-Topic-Msg:" + welcome);
        return welcome;
    }

    @MessageMapping("/sendMessageQueue")
    @SendToUser("/queue/webSocketQueue")
    public Welcome sendToQueue(@RequestBody Welcome welcome){
        System.out.println("Send-Queue-Msg:" + welcome);
        return welcome;
    }

    /**
     * P2P,后台模擬推送給前台,需打開@Scheduled注釋
     */
    //@Scheduled(fixedRate = 1000L)
    public void send(){
        Welcome welcome = new Welcome("110","Hello!");
        simpMessagingTemplate.convertAndSendToUser("110", "/queue/pushInfo", welcome);
        System.err.println(welcome);
    }

    @MessageMapping("/sendKafka")
    public Welcome sendToKafka(@RequestBody Welcome welcome){
        boolean b = kafkaService.send(welcome);
        if (b)
            System.out.println("Send-Kafka-Msg:" + welcome);
        return welcome;
    }

}

 

前端JS

var socket = new SockJS('/clientConnectThis');
var stompClient = Stomp.over(socket);
stompClient.connect({},
    function connectCallback(frame) {   // success
        connectResult("Connect Success");
        stompClient.subscribe('/app/firstConnection', function (response) {
            var returnData = JSON.parse(response.body);
            receiveText("/app/firstConnection,Test Receive:" + returnData.greeting);
        });
    },
    function errorCallBack(error) {     // failed
        connectResult("Connect Break");
    }
);

//發送消息
function sendTopic() {
    var topic = $('#topic').val();
    var message = $('#message').val();
    var name = $('#name').val();
    if(topic == "")
        topic = "Topic";
    if(message == "")
        message = "Greeting";
    if(name == "")
        name = "Name";
    var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
    stompClient.send("/app/sendMessageTopic", {}, messageJson);
    sendText("T-You Send:" + messageJson);
}
function sendQueue() {
    var topic = $('#topic').val();
    var message = $('#message').val();
    var name = $('#name').val();
    if(topic == "")
        topic = "Topic";
    if(message == "")
        message = "Greeting";
    if(name == "")
        name = "Name";
    var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
    stompClient.send("/app/sendMessageQueue", {}, messageJson);
    sendText("Q-You Send:" + messageJson);
}

//訂閱消息
function subscribeTopic(t) {
    $(t).css({
        "backgroundColor": "#000"
    });
    stompClient.subscribe('/topic/webSocketTopic', function (response) {
        var returnData = JSON.parse(response.body);
        receiveText("A-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
    });
    stompClient.subscribe('/topic/webSocketTopic', function (response) {
        var returnData = JSON.parse(response.body);
        receiveText("B-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
    });
}

//訂閱消息
function subscribeQueue(t) {
    $(t).css({
        "backgroundColor": "#000"
    });
    stompClient.subscribe('/user/queue/webSocketQueue', function (response) {
        var returnData = JSON.parse(response.body);
        receiveText("C-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
    });
    stompClient.subscribe('/user/queue/webSocketQueue', function (response) {
        var returnData = JSON.parse(response.body);
        receiveText("D-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
    });
}

function subscribePoint(t) {
    $(t).css({
        "backgroundColor": "#000"
    });
    // /user/{userId}/**
    stompClient.subscribe('/user/110/queue/pushInfo', function (response) {
        var returnData = JSON.parse(response.body);
        receiveText("/user/110/queue/pushInfo,Receive:" + returnData.greeting);
    });
}

function sendKafka() {
    var topic = $('#topic').val();
    var message = $('#message').val();
    var name = $('#name').val();
    if(topic == "")
        topic = "Topic";
    if(message == "")
        message = "Greeting";
    if(name == "")
        name = "Name";
    var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
    stompClient.send("/app/sendKafka", {}, messageJson);
    sendText("Kafka Send:" + messageJson);
}

function kafkaReceive(t) {
    $(t).css({
        "backgroundColor": "#000"
    });
    stompClient.subscribe('/user/kafka-user-id/queue/kafkaMsg', function (response) {
        var returnData = JSON.parse(response.body);
        receiveText("Kafka Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
    });
}


function sendText(v) {
    $('.container .right').append($('<div class="common-message send-text">'+ v +'</div>'));
}

function receiveText(v) {
    $('.container .right').append($('<div class="common-message receive-text">'+ v +'</div>'));
}

function connectResult(v) {
    $('.container').append($('<div class="connect-text">'+ v +'</div>'))
}

 

 

其余的見GitHub

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM