springboot+netty單客戶端連接多個不同的服務端


業務需求: 需要一個客戶端主動向多個服務端設備發送指定數據,然后接受服務端設備返回的十六進制數據,解析並保存.

1.maven依賴

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.10.Final</version>
        </dependency>

2.NettyConfig

   項目啟動時,初始化所有與服務端的連接並保存

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class NettyConfig {

    private static Logger log = LoggerFactory.getLogger(NettyConfig.class);
    
    @Autowired
    IServiceDeviceWarterInfo serviceDeviceWarterInfo;
    
    /**
     * 初始化連接
     * 查詢數據庫表中所有host和port不為空的設備
     * netty客戶端嘗試連接並保存至channels內存中
     */
    @PostConstruct
     public void getDeviceWarterInfo() {
            try {
                Map<String,Object> mapParam =new HashMap<String, Object>();
                List<Map<String, Object>> list = serviceDeviceWarterInfo.selectDeviceWarterInfoByIPPortNotNull(mapParam);
                
                NettyClient.channels = NettyClient.getChannel(list);
                
            }catch (Exception e) {
               e.printStackTrace();
            }

        }
    
}

3.NettyClient

import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { private static Logger log = LoggerFactory.getLogger(NettyClient.class); public static Map<String, ChannelFuture> channels = null; static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); /** * 初始化Bootstrap */ public static final Bootstrap getBootstrap(EventLoopGroup group) { if (null == group) { group = eventLoopGroup; } Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); return bootstrap; }
//二次重連
public static final Map<String , ChannelFuture> twoGetChannel(List<Map<String,Object>> portHosts){ eventLoopGroup.shutdownGracefully(); eventLoopGroup = new NioEventLoopGroup(); return getChannel(portHosts); } // 獲取所有連接 public static final Map<String , ChannelFuture> getChannel(List<Map<String,Object>> portHosts) { Map<String , ChannelFuture> result = new HashMap<>(); Bootstrap bootstrap = getBootstrap(null); for (Map<String, Object> map : portHosts) { String host = (String) map.get("IP"); int port = Integer.valueOf((String) map.get("PORT")); bootstrap.remoteAddress(host, port); //異步連接tcp服務端 ChannelFuture future = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { log.info("與"+host+":"+port+"連接失敗!");

//與設備的連接失敗-推送mqtt主題 Map<String, Object> pushMessage = new HashMap<String,Object>(); pushMessage.put("IP", host); pushMessage.put("PORT", port); pushMessage.put("STATUS", "0");//0-離線 1-正常 2-報警 pushMessage.put("CJSJ", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); MqttPushClient.publish("推送主題", JSONObject.toJSONString(pushMessage)); } }); result.put(host+":"+port, future); } return result; } //發送消息 public static void sendMsg(ChannelFuture future, String msg) throws Exception { if (future != null && future.channel().isActive()) { Channel channel = future.channel(); InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress(); String host = ipSocket.getHostString(); int port = ipSocket.getPort(); System.out.println("向設備" + host+":"+port+"發送數據");
//項目封裝的util類
byte[] msgbytes = ByteUtils.hexStringToBytes(msg); ByteBuf buf = Unpooled.buffer(); buf.writeBytes(msgbytes); // 2.寫數據 future.channel().writeAndFlush(buf).sync(); } } }

 

4.NettyClientHandler

import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
 
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
    private static Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
     /**
     * 建立連接時
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        int port = ipSocket.getPort();
        String host = ipSocket.getHostString();
        log.info("與設備"+host+":"+port+"連接成功!");
        ctx.fireChannelActive();
    }
 
    /**
     * 關閉連接時
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 
        InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        int port = ipSocket.getPort();
        String host = ipSocket.getHostString();
        log.error("與設備"+host+":"+port+"連接斷開!");
        final EventLoop eventLoop = ctx.channel().eventLoop();
         
    }
 
 
    /**
     * 業務邏輯處理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        int port = ipSocket.getPort();
        String host = ipSocket.getHostString();
        
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        String result= ByteUtils.bytesToHexString(bytes);//項目封裝的util類
        
        log.info("接受到 " + host + ":"+port+"的數據: "+ result);
        String datetime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        //數據解析
        
    }
 
}

 

5.定時向服務端發送數據

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelFuture;


@Component
public class NettyTaskTimer {
    
    private static Logger log = LoggerFactory.getLogger(NettyTaskTimer.class);
    
    @Autowired
    IServiceDeviceWarterInfo serviceDeviceWarterInfo;
    
    /**
     * 定時更新設備netty連接
     */
    @Scheduled(cron = "0 0 0/1 * * ?")
    public void getDeviceWarterInfo() {
        try {
            Map<String,Object> mapParam =new HashMap<String, Object>();
            List<Map<String, Object>> list = serviceDeviceWarterInfo.selectDeviceWarterInfoByIPPortNotNull(mapParam);
            
            NettyClient.channels = NettyClient.twoGetChannel(list);
            log.info("更新設備netty連接成功!");
        }catch (Exception e) {
           e.printStackTrace();
        }

    }
    
    /**
     * 通過netty客戶端定時向所有長連接的設備請求數據
     * 每整點分鍾獲取一次,間隔1分鍾
     */
    @Scheduled(cron = "0 0/1 * * * ?")
    private void instantaneousFlow() {
        try {
            
            Map<String, ChannelFuture> channels = NettyClient.channels;
            Set<String> keySet = channels.keySet();
            for (String key : keySet) {
                ChannelFuture channel = channels.get(key);
                NettyClient.sendMsg(channel,"發送的數據");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM