Netty+WebSocket 獲取火幣交易所數據項目


Netty+WebSocket 獲取火幣交易所時時數據項目

先附上項目項目GitHub地址 spring-boot-netty-websocket-huobi

項目簡介

本項目使用 SpringBoot+Netty來開發WebSocket服務器,與火幣交易所Websocket建立連接,時時獲取火幣網交易所推送過來的交易對最新數據

該項目可以直接運用於實際開發中,做為獲取各大交易所最新交易對相關數據的項目。

項目本身也是我在之前公司為了獲取各大交易所數據所開發的項目,現在只是重新整理了下代碼,現在它更像一個腳手架項目,可以在此基礎上很方便的添加其它交易所。

技術架構

SpringBoot2.1.5 +Netty4.1.25 + Maven3.5.4 + lombok(插件)

項目測試

直接啟動Springboot啟動類Application.java,就可以時時獲取火幣網推送過來交易對的數據了。

如圖


一、項目概述

1、項目啟動入口

在項目啟動的時候就開始去連接火幣交易所Websocket訂閱數據。

   /**
     * 首次啟動並訂閱火幣websocket數據
     */
    @PostConstruct
    public void firstSub() {
        try {
            huobiProMainService.start();
        } catch (Exception e) {
            log.error("huobi 首次啟動訂閱異常", e);
        }
    }

2、獲取交易對數據

我們是先要獲取火幣交易所所有的交易對數據,然后告訴火幣交易所我需要訂閱哪些交易對數據。

是訂閱所有交易對數據還是訂閱部分交易對數據。

    @Override
    public synchronized List<String> getChannelCache() {
        // 假設這里是從遠處拉取交易對數據
        List<String> list = Lists.newArrayList("btcusdt");
        return list;
    }

3、連接火幣交易所Websocket,並訂閱指定的交易對。

先與火幣網WebSocket建立連接,連接成功后再告訴它我要訂閱哪些交易對,哪種主題,成功后,火幣交易所就會根據我們所訂閱的主題和交易對,給我們時時推送消息。

 /**
     * 首次訂閱交易對數據
     *
     * @param channelList 交易對列表
     * @param topicFormat 交易對訂閱主題格式
     */
    private void firstSub(List<String> channelList, String topicFormat) {
        //封裝huoBiProWebSocketService對象
        klineClient = new HuoBiProWebSocketClient(huoBiProWebSocketService);
        //啟動連接火幣網websocket
        klineClient.start();
        for (String channel : channelList) {
            //訂閱具體交易對
            klineClient.addSub(formatChannel(topicFormat, channel));
        }
    }

啟動連接火幣網websocket核心代碼

很明顯我們我們是作為客戶端去獲取服務端的數據,所以這里的Bootstrap來與服務端進行數據交互,而不是用ServerBootstrap

還有一點就是作為客戶端,我們是要獲取服務端所推送來的消息,所以我們自定義的handler是入站Handler,所以這里選擇的是SimpleChannelInboundHandler

   /**
         * 連接WebSocket,
         *
         * @param uri url構造出URI
         * @param handler 處理消息
         */
        protected void connectWebSocket(final URI uri, SimpleChannelInboundHandler handler) {
            try {
                String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
                final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
                final int port;

                if (uri.getPort() == -1) {
                    if ("http".equalsIgnoreCase(scheme) || "ws".equalsIgnoreCase(scheme)) {
                        port = 80;
                    } else if ("wss".equalsIgnoreCase(scheme)) {
                        port = 443;
                    } else {
                        port = -1;
                    }
                } else {
                    port = uri.getPort();
                }

                if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                    System.out.println("Only WS(S) is supported");
                    throw new UnsupportedAddressTypeException();
                }
                final boolean ssl = "wss".equalsIgnoreCase(scheme);
                final SslContext sslCtx;
                if (ssl) {
                    sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
                } else {
                    sslCtx = null;
                }

                group = new NioEventLoopGroup(2);
                //構建客戶端Bootstrap
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        if (sslCtx != null) {
                            pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                        }
                        //pipeline可以同時放入多個handler,最后一個為自定義hanler
                        pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
                    }
                });
                channel = bootstrap.connect(host, port).sync().channel();
            } catch (Exception e) {
                log.error(" webSocketClient start error.", e);
                if (group != null) {
                    group.shutdownGracefully();
                }
            }
    }

4、自定義handler

自定義Handler才是核心,作為數據的入站這里選擇繼承SimpleChannelInboundHandler,繼承它必須要實現一個方法就是channelRead0,通過該方法的msg,就可以獲取火幣交易所時時推送過來的消息了。

/**
 * @Description: 火幣網WebSocket 消息處理類
 * 自定義入站的handler 這個也是核心類
 */
@Slf4j
public class HuoBiProWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketClientHandshaker handshaker;
    private HuoBiProWebSocketClient client;
    
    /**
     * 該handel獲取消息的方法
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof BinaryWebSocketFrame) {
            //火幣網的數據是壓縮過的,所以需要我們進行解壓
            BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
            //獲取數據、保存數據
            client.onReceive(decodeByteBuf(binaryFrame.content()));
        } else if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
            client.onReceive(textWebSocketFrame.text());
        } 
     }
  }

二、項目注意點

1、服務器問題

一般交易所的服務器都在國外,所以我們本地是無法建立Websocket連接的,除非本地翻牆。

同樣項目也不能部署到阿里雲等國內服務器,你只能選擇香港或者國外服務器部署項目。

這里是火幣網專門為我們提供的國內測試地址,所以本地可以獲取數據。

2、獲取交易所最新交易對數據問題

我們在向交易所Websocket訂閱交易對的時候,首先就是要知道該交易所有哪些交易對,這份數據是需要我們單獨去獲取的,而且不是一次獲取就好了。

因為該交易所可能新增或者刪除交易對。所以需要我們通過定時任務去獲取更新最新的交易對數據。

我這邊只是模擬了一個交易對btcusdt,並沒有提供獲取最新交易對數據的服務。

3、數據存儲問題

這也是最值得思考的一個問題,數據我們是獲取了,但如何保存!

正常合理的開發應該獲取數據是一個微服務,處理獲取的數據是一個微服務。那么只需要獲取數據后去調處理數據微服務就可以保存數據了。

但在這里,如果只是這樣是行不通的。

因為火幣網向我們推送的消息的速度會比我們調其它服務保存的數據要快,這就會存在數據丟失的情況發生

這里僅僅是輸出一個btcusdt交易對,並且只是訂閱一個k線主題,而實際上交易所會有上百個交易對和幾種訂閱主題,

這樣的消息推送速度是上面的幾百倍。所以你會發現如果你不做任何改動,對於一些大的交易所而言,你的數據是來不及存儲的。


補充

這邊之前也寫過有關 Netty 和 Websocket 相關的博客文章,可以做個參考

1、Netty專題(共9篇)

2、Websocket專題(共5篇)



 我相信,無論今后的道路多么坎坷,只要抓住今天,遲早會在奮斗中嘗到人生的甘甜。抓住人生中的一分一秒,勝過虛度中的一月一年!(1)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM