業務需求: 需要一個客戶端主動向多個服務端設備發送指定數據,然后接受服務端設備返回的十六進制數據,解析並保存.
1.maven依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.10.Final</version> </dependency>
2.NettyConfig
項目啟動時,初始化所有與服務端的連接並保存
import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class NettyConfig { private static Logger log = LoggerFactory.getLogger(NettyConfig.class); @Autowired IServiceDeviceWarterInfo serviceDeviceWarterInfo; /** * 初始化連接 * 查詢數據庫表中所有host和port不為空的設備 * netty客戶端嘗試連接並保存至channels內存中 */ @PostConstruct public void getDeviceWarterInfo() { try { Map<String,Object> mapParam =new HashMap<String, Object>(); List<Map<String, Object>> list = serviceDeviceWarterInfo.selectDeviceWarterInfoByIPPortNotNull(mapParam); NettyClient.channels = NettyClient.getChannel(list); }catch (Exception e) { e.printStackTrace(); } } }
3.NettyClient
import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { private static Logger log = LoggerFactory.getLogger(NettyClient.class); public static Map<String, ChannelFuture> channels = null; static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); /** * 初始化Bootstrap */ public static final Bootstrap getBootstrap(EventLoopGroup group) { if (null == group) { group = eventLoopGroup; } Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); return bootstrap; }
//二次重連 public static final Map<String , ChannelFuture> twoGetChannel(List<Map<String,Object>> portHosts){ eventLoopGroup.shutdownGracefully(); eventLoopGroup = new NioEventLoopGroup(); return getChannel(portHosts); } // 獲取所有連接 public static final Map<String , ChannelFuture> getChannel(List<Map<String,Object>> portHosts) { Map<String , ChannelFuture> result = new HashMap<>(); Bootstrap bootstrap = getBootstrap(null); for (Map<String, Object> map : portHosts) { String host = (String) map.get("IP"); int port = Integer.valueOf((String) map.get("PORT")); bootstrap.remoteAddress(host, port); //異步連接tcp服務端 ChannelFuture future = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { log.info("與"+host+":"+port+"連接失敗!");
//與設備的連接失敗-推送mqtt主題 Map<String, Object> pushMessage = new HashMap<String,Object>(); pushMessage.put("IP", host); pushMessage.put("PORT", port); pushMessage.put("STATUS", "0");//0-離線 1-正常 2-報警 pushMessage.put("CJSJ", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); MqttPushClient.publish("推送主題", JSONObject.toJSONString(pushMessage)); } }); result.put(host+":"+port, future); } return result; } //發送消息 public static void sendMsg(ChannelFuture future, String msg) throws Exception { if (future != null && future.channel().isActive()) { Channel channel = future.channel(); InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress(); String host = ipSocket.getHostString(); int port = ipSocket.getPort(); System.out.println("向設備" + host+":"+port+"發送數據");
//項目封裝的util類 byte[] msgbytes = ByteUtils.hexStringToBytes(msg); ByteBuf buf = Unpooled.buffer(); buf.writeBytes(msgbytes); // 2.寫數據 future.channel().writeAndFlush(buf).sync(); } } }
4.NettyClientHandler
import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoop; public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(NettyClientHandler.class); /** * 建立連接時 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); log.info("與設備"+host+":"+port+"連接成功!"); ctx.fireChannelActive(); } /** * 關閉連接時 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); log.error("與設備"+host+":"+port+"連接斷開!"); final EventLoop eventLoop = ctx.channel().eventLoop(); } /** * 業務邏輯處理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String result= ByteUtils.bytesToHexString(bytes);//項目封裝的util類 log.info("接受到 " + host + ":"+port+"的數據: "+ result); String datetime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //數據解析 } }
5.定時向服務端發送數據
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import io.netty.channel.ChannelFuture; @Component public class NettyTaskTimer { private static Logger log = LoggerFactory.getLogger(NettyTaskTimer.class); @Autowired IServiceDeviceWarterInfo serviceDeviceWarterInfo; /** * 定時更新設備netty連接 */ @Scheduled(cron = "0 0 0/1 * * ?") public void getDeviceWarterInfo() { try { Map<String,Object> mapParam =new HashMap<String, Object>(); List<Map<String, Object>> list = serviceDeviceWarterInfo.selectDeviceWarterInfoByIPPortNotNull(mapParam); NettyClient.channels = NettyClient.twoGetChannel(list); log.info("更新設備netty連接成功!"); }catch (Exception e) { e.printStackTrace(); } } /** * 通過netty客戶端定時向所有長連接的設備請求數據 * 每整點分鍾獲取一次,間隔1分鍾 */ @Scheduled(cron = "0 0/1 * * * ?") private void instantaneousFlow() { try { Map<String, ChannelFuture> channels = NettyClient.channels; Set<String> keySet = channels.keySet(); for (String key : keySet) { ChannelFuture channel = channels.get(key); NettyClient.sendMsg(channel,"發送的數據"); } } catch (Exception e) { e.printStackTrace(); } } }