Netty作為一個高性能的異步網絡開發框架,可以作為各種服務的開發框架。
前段時間的一個項目涉及到硬件設備實時數據的采集,采用Netty作為采集服務的實現框架,同時使用RabbitMQ作為采集服務和各個其他模塊的通信消息隊列,整個服務框架圖如下:
將業務代碼和實際協議解析部分的代碼抽離,得到以上一個簡單的設計圖,代碼開源在GitHub上,簡單介紹下NettyMQServer采集服務涉及到的幾個關鍵技術點:
1、設備TCP消息解析:
NettyMQServer和采集設備Device之間采用TCP通信,TCP消息的解析可以使用LengthFieldBasedFrameDecoder(消息頭和消息體),可以有效的解決TCP消息“粘包”問題。
消息包解析圖如下:bytes length field at offset 0, do not strip header, the length field represents the length of the whole message
lengthFieldOffset = 0 lengthFieldLength = 2 lengthAdjustment = -2 (= the length of the Length field) initialBytesToStrip = 0 BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) +--------+----------------+ +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" | +--------+----------------+ +--------+----------------+
代碼中消息長度的存儲采用了4個字節,采用LengthFieldBasedFrameDecoder(65535,0,4,-4,0)解碼,Netty會從接收的數據中頭4個字節中得到消息的長度,進而得到一個TCP消息包。
2、給設備發消息:
首先在連接創建時,要保留TCP的連接:
static final ChannelGroup channels = new DefaultChannelGroup( GlobalEventExecutor.INSTANCE); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // A closed channel will be removed from ChannelGroup automatically channels.add(ctx.channel()); }
在每次一個Channel Active(連接創建)的時候用ChannelGroup保存這個Channel連接,當需要給某個設備發消息的時候,可以遍歷該ChannelGroup,找到對應的Channel,給該Channel發送消息:
for (io.netty.channel.Channel c : EchoServerHandler.channels) { ByteBuf msg = Unpooled.copiedBuffer(message.getBytes()); c.writeAndFlush(msg); }
這里是給所有的連接的設備都發。當連接斷開的時候,ChannelGroup會自動remove掉這個連接,不需要我們手動管理。
3、心跳檢測
當某個設備Device由於斷電或是其他原因導致設備不正常無法采集數據,Netty服務端需要知道該設備是否在正常工作,可以使用Netty的IdleStateHandler,示例代碼如下:
// 3 minutes for read idle ch.pipeline().addLast(new IdleStateHandler(3*60,0,0)); ch.pipeline().addLast(new HeartBeatHandler()); /** * Handler implementation for heart beating. */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter{ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { // Read timeout System.out.println("READER_IDLE: read timeout from "+ctx.channel().remoteAddress()); //ctx.disconnect(); //Channel disconnect } } } }
上面設置3分鍾沒有讀到數據,則觸發一個READER_IDLE事件。
4、RabbitMQ消息接收與發送
NettyMQServer消息發送采用了Spring AMQP,只需要在配置文件中簡單配置一下,就可以方便使用。
NettyMQServer消息接收同樣可以采用Spring AMQP,但由於對Spring相關的配置不是很熟悉,為了更靈活的使用MQ,這里使用了RabbitMQ Client Java API來實現:
Connection connection = connnectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, "direct", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routeKey); // process the message one by one channel.basicQos(1); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); // auto-ack is false channel.basicConsume(queueName, false, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); log.debug("Mq Receiver get message"); // Send the message to all connected clients // If you want to send to a specified client, just add // your own logic and ack manually // Be aware that ChannelGroup is thread safe log.info(String.format("Conneted client number: %d",EchoServerHandler.channels.size())); for (io.netty.channel.Channel c : EchoServerHandler.channels) { ByteBuf msg = Unpooled.copiedBuffer(message.getBytes()); c.writeAndFlush(msg); } // manually ack to MQ server the message is consumed. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
以上代碼從一個Queue中讀取數據,為了有效處理數據,防止異常數據丟失,使用了手動Ack。
RabbitMQ的使用方式:http://www.cnblogs.com/luxiaoxun/p/3918054.html
代碼托管在GitHub上:https://github.com/luxiaoxun/Code4Java
參考:
http://netty.io/
http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html
http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html