java-websocket客戶端 斷線重連 注入Service問題


java版客戶端:

使用開源項目java-websocket, github地址: https://github.com/TooTallNate/Java-WebSocket

github上有很多示例,具體可以去查看

此處主要是記錄java-websocket實現客戶端,並解決無法使用Service層方法(service為null)的問題,以及斷線重連

引用包

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.3.9</version>
</dependency>

第一版,使用getBean獲取Service層方法,並且實現斷線重連

使用的是GitHub上的demo示例

import com.alibaba.fastjson.JSONArray;import com.sensor.vibration.utils.ApplicationContextRegister;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;

import java.net.URI;
import java.util.Map;

/** This example demonstrates how to create a websocket connection to a server. Only the most important callbacks are overloaded. */

@Slf4j
public class SensorWebSocketClient extends WebSocketClient {
    @Autowired
    private UserService userService;

    public SensorWebSocketClient(URI serverUri , Draft draft ) {
        super( serverUri, draft );
    }

    public SensorWebSocketClient(URI serverURI ) {
        super( serverURI );
    }

    public SensorWebSocketClient(URI serverUri, Map<String, String> httpHeaders ) {
        super(serverUri, httpHeaders);
    }

    @Override
    public void onOpen( ServerHandshake handshakedata ) {
        System.out.println( "opened connection" );
        // if you plan to refuse connection based on ip or httpfields overload: onWebsocketHandshakeReceivedAsClient
    }

    @Override
    public void onMessage( String msg ) {
        log.info("[websocket] 收到消息={}",msg);
    }

    @Override
    public void onClose( int code, String reason, boolean remote ) {
        // The codecodes are documented in class org.java_websocket.framing.CloseFrame
        System.out.println( "Connection closed by " + ( remote ? "remote peer" : "us" ) + " Code: " + code + " Reason: " + reason );
    }

    @Override
    public void onError( Exception ex ) {
        ex.printStackTrace();
        // if the error is fatal then onClose will be called additionally
    }

}

新建一個類,創建一個方法,啟動websocket

import java.net.URI;
import java.net.URISyntaxException;


/**
 * Simple example to reconnect blocking and non-blocking.
 */

public class ReconnectClient {
    public static void reconnect() throws URISyntaxException, InterruptedException{
        SensorWebSocketClient c = new SensorWebSocketClient( new URI( "ws://localhost:5005/websocket" ) );
        c.connectBlocking();

        new Thread(new Runnable() {
            public void run() {
                System.out.println("Runnable running..");
            }
        }) {
            public void run() {
                while (true){
                    try{
                        Thread.sleep(3000);
                        c.send("");
                    }catch (Exception e){
                        c.reconnect();
                    }
                }
            };
        }.start();
    }
}

在新建一個類,程序啟動的時候,調用上面的方法

import com.sensor.vibration.utils.Common;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.URISyntaxException;
import java.util.Map;


@Slf4j
@Component
public class InitStart implements CommandLineRunner {

    @Override
    public void run(String... args) throws URISyntaxException, InterruptedException{
        ReconnectClient.reconnect();
    }

}

中間的啟動類的方法可以省去,直接寫在InitStart的run方法里面

現在還不能使用Service層的方法,會報service為null異常,百度后,參考別人使用getBean方法,寫一個工具類

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;


@Component
@Lazy(false)
public class ApplicationContextRegister  implements ApplicationContextAware {
    private static ApplicationContext APPLICATION_CONTEXT;

    /**
     * 設置spring上下文  *  * @param applicationContext spring上下文  * @throws BeansException
     */

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        APPLICATION_CONTEXT = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return APPLICATION_CONTEXT;
    }


}

在 SensorWebSocketClient.java 中使用Service

@Autowired
private UserService userService;

ApplicationContext act = ApplicationContextRegister.getApplicationContext();
userService=act.getBean(UserService.class);

但是 領導不讓用getBean這種方法,放棄

第二版,使用Service層方法版本 + 斷線重連

實現:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.net.URI;

/**
 * Created by Chow on 2019/8/22
 */

@Slf4j
@Component
public class WebSocketClientStart {
    @Autowired
    private UserService userService;


    @Bean
    public WebSocketClient webSocketClient() {
        try {
            WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://127.0.0.1:5005/websocket")) {
                @Override
                public void onOpen(ServerHandshake handshakedata) {
                    log.info("[websocket] 連接成功");
                }

                @Override
                public void onMessage(String msg) {
                    try{
                        log.info("[websocket] 收到消息={}",msg);

                        if (msg == null || StringUtils.isBlank(msg)){
                            log.error("the msg message of websocket received is null");
                            this.send("");
                            return;
                        }
                        JSONArray jsonArray = JSONArray.parseArray(msg);
                        if (jsonArray == null || jsonArray.size() == 0){
                            log.info("log: the message of websocket received is empty");
                        }

                        vibrationAlarmService.alarmAnalysis(jsonArray);
                        this.send("");
                    }catch (Exception e){
                        log.error(e.getMessage(), e);
                        this.send("");
                    }

                }

                @Override
                public void onClose(int code, String reason, boolean remote) {
                    log.info("[websocket] 退出連接");
                }

                @Override
                public void onError(Exception ex) {
                    log.info("[websocket] 連接錯誤={}",ex.getMessage());
                }
            };
            webSocketClient.connect();
            new Thread(new Runnable() {

                public void run() {
                    System.out.println("Runnable running..");
                }

            }) {

                public void run() {
                    while (true){
                        try{
                            Thread.sleep(3000);
                            webSocketClient.send("");
                        }catch (Exception e){
                            webSocketClient.reconnect();
                        }
                    }

                }

            }.start();
            return webSocketClient;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }


}

 

 

 測試服務端代碼

需要引入包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.1.3.RELEASE</version>
</dependency>

server:

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

@ServerEndpoint(value = "/websocket")
@Component
public class MyWebSocket {
    //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。
    private static int onlineCount = 0;

    //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
    private static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>();

    //與某個客戶端的連接會話,需要通過它來給客戶端發送數據
    private Session session;

    /**
     * 連接建立成功調用的方法*/
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在線數加1
        System.out.println("有新連接加入!當前在線人數為" + getOnlineCount());
        try {
            sendMessage("當前在線人數為" + getOnlineCount());
        } catch (IOException e) {
            System.out.println("IO異常");
        }
    }

    /**
     * 連接關閉調用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //從set中刪除
        subOnlineCount();           //在線數減1
        System.out.println("有一連接關閉!當前在線人數為" + getOnlineCount());
    }

    /**
     * 收到客戶端消息后調用的方法
     *
     * @param message 客戶端發送過來的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("來自客戶端的消息:" + message);

        //群發消息
        for (MyWebSocket item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("發生錯誤");
        error.printStackTrace();
    }


    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 群發自定義消息
     * */
    public static void sendInfo(String message) throws IOException {
        for (MyWebSocket item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        MyWebSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        MyWebSocket.onlineCount--;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM