业务需求: 需要一个客户端主动向多个服务端设备发送指定数据,然后接受服务端设备返回的十六进制数据,解析并保存.
1.maven依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.10.Final</version> </dependency>
2.NettyConfig
项目启动时,初始化所有与服务端的连接并保存
import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class NettyConfig { private static Logger log = LoggerFactory.getLogger(NettyConfig.class); @Autowired IServiceDeviceWarterInfo serviceDeviceWarterInfo; /** * 初始化连接 * 查询数据库表中所有host和port不为空的设备 * netty客户端尝试连接并保存至channels内存中 */ @PostConstruct public void getDeviceWarterInfo() { try { Map<String,Object> mapParam =new HashMap<String, Object>(); List<Map<String, Object>> list = serviceDeviceWarterInfo.selectDeviceWarterInfoByIPPortNotNull(mapParam); NettyClient.channels = NettyClient.getChannel(list); }catch (Exception e) { e.printStackTrace(); } } }
3.NettyClient
import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { private static Logger log = LoggerFactory.getLogger(NettyClient.class); public static Map<String, ChannelFuture> channels = null; static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); /** * 初始化Bootstrap */ public static final Bootstrap getBootstrap(EventLoopGroup group) { if (null == group) { group = eventLoopGroup; } Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); return bootstrap; }
//二次重连 public static final Map<String , ChannelFuture> twoGetChannel(List<Map<String,Object>> portHosts){ eventLoopGroup.shutdownGracefully(); eventLoopGroup = new NioEventLoopGroup(); return getChannel(portHosts); } // 获取所有连接 public static final Map<String , ChannelFuture> getChannel(List<Map<String,Object>> portHosts) { Map<String , ChannelFuture> result = new HashMap<>(); Bootstrap bootstrap = getBootstrap(null); for (Map<String, Object> map : portHosts) { String host = (String) map.get("IP"); int port = Integer.valueOf((String) map.get("PORT")); bootstrap.remoteAddress(host, port); //异步连接tcp服务端 ChannelFuture future = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { log.info("与"+host+":"+port+"连接失败!");
//与设备的连接失败-推送mqtt主题 Map<String, Object> pushMessage = new HashMap<String,Object>(); pushMessage.put("IP", host); pushMessage.put("PORT", port); pushMessage.put("STATUS", "0");//0-离线 1-正常 2-报警 pushMessage.put("CJSJ", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); MqttPushClient.publish("推送主题", JSONObject.toJSONString(pushMessage)); } }); result.put(host+":"+port, future); } return result; } //发送消息 public static void sendMsg(ChannelFuture future, String msg) throws Exception { if (future != null && future.channel().isActive()) { Channel channel = future.channel(); InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress(); String host = ipSocket.getHostString(); int port = ipSocket.getPort(); System.out.println("向设备" + host+":"+port+"发送数据");
//项目封装的util类 byte[] msgbytes = ByteUtils.hexStringToBytes(msg); ByteBuf buf = Unpooled.buffer(); buf.writeBytes(msgbytes); // 2.写数据 future.channel().writeAndFlush(buf).sync(); } } }
4.NettyClientHandler
import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoop; public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(NettyClientHandler.class); /** * 建立连接时 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); log.info("与设备"+host+":"+port+"连接成功!"); ctx.fireChannelActive(); } /** * 关闭连接时 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); log.error("与设备"+host+":"+port+"连接断开!"); final EventLoop eventLoop = ctx.channel().eventLoop(); } /** * 业务逻辑处理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String result= ByteUtils.bytesToHexString(bytes);//项目封装的util类 log.info("接受到 " + host + ":"+port+"的数据: "+ result); String datetime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //数据解析 } }
5.定时向服务端发送数据
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import io.netty.channel.ChannelFuture; @Component public class NettyTaskTimer { private static Logger log = LoggerFactory.getLogger(NettyTaskTimer.class); @Autowired IServiceDeviceWarterInfo serviceDeviceWarterInfo; /** * 定时更新设备netty连接 */ @Scheduled(cron = "0 0 0/1 * * ?") public void getDeviceWarterInfo() { try { Map<String,Object> mapParam =new HashMap<String, Object>(); List<Map<String, Object>> list = serviceDeviceWarterInfo.selectDeviceWarterInfoByIPPortNotNull(mapParam); NettyClient.channels = NettyClient.twoGetChannel(list); log.info("更新设备netty连接成功!"); }catch (Exception e) { e.printStackTrace(); } } /** * 通过netty客户端定时向所有长连接的设备请求数据 * 每整点分钟获取一次,间隔1分钟 */ @Scheduled(cron = "0 0/1 * * * ?") private void instantaneousFlow() { try { Map<String, ChannelFuture> channels = NettyClient.channels; Set<String> keySet = channels.keySet(); for (String key : keySet) { ChannelFuture channel = channels.get(key); NettyClient.sendMsg(channel,"发送的数据"); } } catch (Exception e) { e.printStackTrace(); } } }