UDP接收百萬級數據的解決方案


任務:有個發送方,會通過udp發送一些信息,然后服務接收到信息后保存到數據庫的一張表A,保存的這些數據在經過一系列處理,處理完成后累積到另一張表B,然后清空處理的表A的數據。目前發送方比較少,不久就要增加到100個。

 

我采用netty5來進行udp的網絡通訊,將接收到的數據保存到BlockingQueue中,然后讀取BlockingQueue中的數據,取到100條就存到hbase數據庫中。

初始化netty

 

int DEFAULT_PORT = 6000;
EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
                    .handler(new UdpServerHandler());
            Channel channel = bootstrap.bind(DEFAULT_PORT).sync().channel();
            channel.closeFuture().await();
            LOGGER.info("netty初始化成功!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

 

 接收udp數據

public BlockingQueue<Map<String, Object>> queue = <br>new LinkedBlockingQueue<Map<String, Object>>(990000);
protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
            // 因為Netty對UDP進行了封裝,所以接收到的是DatagramPacket對象。
            String result = msg.content().toString(CharsetUtil.UTF_8);
             
            Map<String, Object> getMap = new HashMap<String, Object>();
//處理數據
             
                    <br>queue.put(getMap);
                 
            <br>ctx.writeAndFlush(new DatagramPacket(<br>Unpooled.copiedBuffer("結果:", CharsetUtil.UTF_8), msg.sender()));

 讀取數據存hbase

public void getDate() {
        LOGGER.info("開始取數據");
        List<Map<String, Object>> jsonList = new ArrayList<Map<String, Object>>();
            while (true) {
                Map<String, Object> takeMap = null;
                try {
                    takeMap = queue.take();
                    if (takeMap == null) {
                        continue;
                    }
                    jsonList.add(takeMap);
                    if (jsonList.size() == 100) {
                        String httpJson = HbaseUtil.toHttpJson(vo.getTableName(), jsonList);
                        LOGGER.info(httpJson);
                        List<HbaseDataEntity> hbaseDatas =ParseJson.getData(httpJson);
                        HbaseAPI.insertDataList(hbaseDatas);
                        jsonList.clear();
                        LOGGER.info("hbase存了100條");
                    }
                } catch (Exception e) {
                    jsonList.clear();
                    continue;
                }
            }
 
        }

 

  • BlockingQueue一定要設置大小,不設置是int最大值,有可能會內存溢出;
  • 從BlockingQueue取數據的時候一定要阻塞式取take(),負責會死循環,占CPU100%;
  • hbase庫連接時是阻塞式的,如果連接不上會一直阻塞。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM