SpringBoot集成websocket發送后台日志到前台頁面


業務需求

后台為一個采集系統,需要將采集過程中產生的日志實時發送到前台頁面展示,以便了解采集過程。

技能點

  • SpringBoot 2.x
  • websocket
  • logback
  • thymeleaf
  • RabbitMQ

之所以使用到RabbitMQ是因為實際環境中采集服務為多個,為了統一處理日志信息,將日志都先灌入mq中,再統一從mq中進行消費

引入關鍵pom

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!--websocket -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<!--rabbitmq -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

logback配置文件引入AmqpAppender

<springProperty scope="context" name="rabbitmq-address" source="spring.rabbitmq.addresses" defaultValue="127.0.0.1:5672" />
<springProperty scope="context" name="rabbitmq-username" source="spring.rabbitmq.username" defaultValue="guest" />
<springProperty scope="context" name="rabbitmq-password" source="spring.rabbitmq.password" defaultValue="guest" />
<springProperty scope="context" name="rabbitmq-virtual-host" source="spring.rabbitmq.virtual-host" defaultValue="/" />
<springProperty scope="context" name="exhcange-name" source="platform.parameter.exhcangeName" defaultValue="default-exchange" />
<springProperty scope="context" name="binding-key" source="platform.parameter.bindingKey" defaultValue="default-routing" />

<appender name="RabbitMq"  class="org.springframework.amqp.rabbit.logback.AmqpAppender">
	<layout>
		<pattern>[%X{traceId}] - %d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>     <!--<1>-->
	</layout>
	<!--rabbitmq地址 -->
	<addresses>${rabbitmq-address}</addresses>
	<username>${rabbitmq-username}</username>
	<password>${rabbitmq-password}</password>
	<virtualHost>${rabbitmq-username}</virtualHost>
	
	<declareExchange>false</declareExchange>
	<exchangeType>direct</exchangeType>
	<exchangeName>${exhcange-name}</exchangeName>
	<routingKeyPattern>${binding-key}</routingKeyPattern>
	<generateId>true</generateId>
	<charset>UTF-8</charset>
	<durable>true</durable>
	<deliveryMode>NON_PERSISTENT</deliveryMode>
	<filter class="com.log.websocket.stomp.LogFilter">  
		<level>INFO</level>
	</filter>
</appender>

<springProfile name="dev">
	<root level="debug">
		<appender-ref ref="RabbitMq" />
	</root>
</springProfile>

日志過濾器

logback配置文件中添加的AmqpAppender使用了filter,具體的filter如下所示:

public class LogFilter extends AbstractMatcherFilter<ILoggingEvent> {

	Level level;

	@Override
	public FilterReply decide(ILoggingEvent event) {
		if (!isStarted()) {
			return FilterReply.NEUTRAL;
		}
		//過濾指定級別的日志
		if(event.getLevel().equals(level)){
			Map<String, String> mdcMap = event.getMDCPropertyMap();
			String tracId = mdcMap.get("traceId");
			//過濾日志中帶有traceId的日志,其他的不需要,traceId使用aop添加
			if(StringUtils.isNotBlank(tracId)){
				return FilterReply.ACCEPT;
			}
		}
		return FilterReply.DENY;
	}

	public void setLevel(Level level) {
		this.level = level;
	}

	@Override
	public void start() {
		if (this.level != null) {
			super.start();
		}
	}
}

說明:

AmqpAppender中的filter設置了過濾級別,因此只過濾指定級別的日志;

過濾日志中帶有traceId的日志,traceId通過aop添加,具體參考后面的aop設置;

aop方式添加traceId

編寫LogAspect如下所示:

@Order(1)
@Aspect
@Component
public class LogAspect {

    /**
     * 所有的業務類的類名都是xxSpiderxxImpl,統一入口都是gatherData方法
     */
    @Pointcut("execution(* com.log..*.service..*Spider*Impl.gatherData(..))")
    public void pointCut() {}

    @Before("pointCut()")
    public void before(JoinPoint joinPoint){
        //切點已經確定是com.log..*.service..*Spider*Impl.gatherData(..),該方法的參數只有一個,且為GatherTaskVO
        GatherTaskVO vo = (GatherTaskVO)joinPoint.getArgs()[0];
        //將任務id作為traceId
        MDC.put("traceId", vo.getId());
    }

    @After("pointCut()")
    public void after(JoinPoint joinPoint){
        //方法執行完成以后,刪除traceId
        MDC.remove("traceId");
    }
}

解釋一下MDC:

對於多個線程同時執行的系統或者分布式系統中,各個線程的日志穿插執行,導致我們無法直觀的直接定位整個操作流程,因此,我們需要對一個線程的操作流程進行歸類標記,比如使用線程+時間戳或者用戶id等,從而使我們能夠從混亂的日志中梳理處整個線程的操作流程,因此Slf4j的MDC應運而生,logback和log4j支持MDC。

MDC中提供的方法如下所示;

package org.jboss.logging;

import java.util.Collections;
import java.util.Map;

/**
 * 刪除了非必須代碼以及注釋
 * Mapped diagnostic context. Each log provider implementation may behave different.
 */
public final class MDC {

   //uts the value onto the context.
    public static Object put(String key, Object val);

    //Returns the value for the key or {@code null} if no value was found.
    public static Object get(String key);

	//Removes the value from the context.
    public static void remove(String key);


   //Clears the message diagnostics context.
    public static void clear();
}

MDC提供的方法比較簡單,使用也很簡單,只需要將指定的值put到線程上下文中,在對應的地方調用get方法獲取到值即可。

注意看上述AmqpAppender配置中標記<1>中的traceId即為我們此處添加到線程上下文中的值,如下所示

<layout>
	<pattern>[%X{traceId}] - %d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>
</layout>

開啟websocket支持

Springboot環境下注入ServerEndpointExporter以開啟websocket支持

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

websocketServer

websocketServer用來開啟連接,關閉連接以及接收消息等

@Slf4j
@ServerEndpoint("/socketserver/{taskId}")
@Component
public class WebSocketServer {
    /**concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。*/
    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**與某個客戶端的連接會話,需要通過它來給客戶端發送數據*/
    private Session session;
    /**接收taskId*/
    private String taskId="";

    /**
     * 連接建立成功調用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("taskId") String taskId) {
        this.session = session;
        this.taskId=taskId;
        if(webSocketMap.containsKey(taskId)){
            webSocketMap.remove(taskId);
            webSocketMap.put(taskId,this);
        }else{
            webSocketMap.put(taskId,this);
        }
        try {
            sendMessage("socket連接成功");
        } catch (IOException e) {
            log.error("socket>>"+taskId+",網絡異常!!!!!!");
        }
    }

    /**
     * 連接關閉調用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(taskId)){
            webSocketMap.remove(taskId);
        }
    }

    /**
     * 收到客戶端消息后調用的方法
     * TODO 客戶端交互使用,暫無用到
     * @param message 客戶端發送過來的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("socket>>>:"+taskId+",報文:"+message);
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用戶錯誤:"+this.taskId+",原因:"+error.getMessage());
        error.printStackTrace();
    }
    /**
     * 實現服務器主動推送
     */
    public void sendMessage(String message) throws IOException {
        //加鎖,否則會出現java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method異常,並發使用session發送消息導致的
        synchronized (this.session){
            this.session.getBasicRemote().sendText(message);
        }
    }

    public ConcurrentHashMap<String,WebSocketServer> getWebSocketMap(){ return webSocketMap; }
}

前台頁面

前台頁面使用js來調用websocket,請求websocketserver打開socket連接,並且開始和后台交互發送消息

<!DOCTYPE html >
<html xmlns:th="http://www.thymeleaf.org" >
<head>
    <meta charset="utf-8">
    <title>任務日志展示</title>
</head>
<body>
<script th:src="@{/js/jquery.min.js}"></script>
<input type="hidden" id="gather_task_id" th:value="${taskId}" />
<script>

    var socket;
    function openSocket() {
        var detailDiv = $("#log_detail");
        var taskId = $("#gather_task_id").val();
        //實現化WebSocket對象,指定要連接的服務器地址與端口  建立連接
        var socketUrl="http://localhost:8888/socketserver/"+taskId;
        socketUrl=socketUrl.replace("https","ws").replace("http","ws");
        if(socket!=null){
            socket.close();
            socket=null;
        }
        socket = new WebSocket(socketUrl);
        //打開事件
        socket.onopen = function() {
            console.log("websocket已打開");
        };
        //獲得消息事件
        socket.onmessage = function(msg) {
            console.log(msg.data);
            //發現消息進入    開始處理前端觸發邏輯
            detailDiv.append("<p>"+msg.data+"</p>")
        };
        //關閉事件
        socket.onclose = function() {
            console.log("websocket已關閉");
        };
        //發生了錯誤事件
        socket.onerror = function() {
            console.log("websocket發生了錯誤");
        }
    }
    function sendMessage() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的瀏覽器不支持WebSocket");
        }else {
            console.log("您的瀏覽器支持WebSocket");
            console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
            socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
        }
    }


    function printLog(){
        if(typeof(WebSocket) == "undefined") {
            console.log("您的瀏覽器不支持WebSocket");
            alert("您的瀏覽器不支持WebSocket");
        }else {
            openSocket();
        }
    }

    function quit(){
        if(socket!=null){
            socket.close();
            socket=null;
            var detailDiv = $("#log_detail");
            detailDiv.append("<p>客戶端已退出</p>")
        }
    }
</script>

<a href="javascript:void(0);" onclick="printLog()" >打印日志</a>
<a href="javascript:void(0);" onclick="quit()">退出</a>
<div id="log_detail">

</div>
</body>

</html>

消費mq中的日志消息

service中產生的日志是添加到mq隊列中的,因此需要一個消費者消費隊列中的數據,並且使用websocketserver將消息發送到對應的頁面上,從而在頁面上進行展示

@Component
@Slf4j
public class LogConsumer {
    @Resource
    private WebSocketService webSocketService;

    @RabbitHandler
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(name = "${platform.parameter.queueName}",durable = "true"),
                    exchange = @Exchange(name = "${platform.parameter.exhcangeName}",ignoreDeclarationExceptions="true",durable = "true"),
                    key = "${platform.parameter.bindingKey}"
            ),
            concurrency = "2"
    )
    public void listennerPush(String msg, Channel channel, Message message) throws IOException {
        try {
            log.debug("consumer>>>接收到的消息>>>{}",msg);
            //[1] - 13:15:17.484 - TwitterSpiderMobileService實現類方法<<<<任務id:1
            msg.split(" - ")[0].trim().replace("[","").replace("]","");
            String tracId =  msg.substring(0,msg.indexOf(" - ")).trim().replace("[","").replace("]","");
            msg = msg.substring(msg.indexOf(" - ")+2);
            //調用websocket發送日志信息到頁面上
            webSocketService.sendMessage(tracId,msg);
        } catch (Exception e) {
            log.error("獲取消息失敗,異常原因:{}",e.getMessage(),e);
        } finally {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}

sendMessage方法如下所示:

@Override
public void sendMessage(String taskId, String logMessage) {
	try {
		ConcurrentHashMap<String, WebSocketServer> map =  webSocketServer.getWebSocketMap();
		WebSocketServer server =  map.get(taskId);
		if(server!=null){
			server.sendMessage(logMessage);
		}else{
			log.warn("客戶端已退出");
		}
	} catch (IOException e) {
		log.error("向客戶端發送消息時出現異常,異常原因:{}",e.getMessage(),e);
	}
}

最終效果圖

經過以上步驟即可將service中生成的日志接近實時的顯示在前台頁面上,最后的顯示效果如下所示:

參考資料

1.SpringBoot2.0集成WebSocket,實現后台向前端推送信息

本文所對應的代碼已上傳gitee,有需要的可以自行下載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM