任務:有個發送方,會通過udp發送一些信息,然后服務接收到信息后保存到數據庫的一張表A,保存的這些數據在經過一系列處理,處理完成后累積到另一張表B,然后清空處理的表A的數據。目前發送方比較少,不久就要增加到100個。
我采用netty5來進行udp的網絡通訊,將接收到的數據保存到BlockingQueue中,然后讀取BlockingQueue中的數據,取到100條就存到hbase數據庫中。
初始化netty
int DEFAULT_PORT = 6000; EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true) .handler(new UdpServerHandler()); Channel channel = bootstrap.bind(DEFAULT_PORT).sync().channel(); channel.closeFuture().await(); LOGGER.info("netty初始化成功!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); }
接收udp數據
public BlockingQueue<Map<String, Object>> queue = <br>new LinkedBlockingQueue<Map<String, Object>>(990000); protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { // 因為Netty對UDP進行了封裝,所以接收到的是DatagramPacket對象。 String result = msg.content().toString(CharsetUtil.UTF_8); Map<String, Object> getMap = new HashMap<String, Object>(); //處理數據 <br>queue.put(getMap); <br>ctx.writeAndFlush(new DatagramPacket(<br>Unpooled.copiedBuffer("結果:", CharsetUtil.UTF_8), msg.sender()));
讀取數據存hbase
public void getDate() { LOGGER.info("開始取數據"); List<Map<String, Object>> jsonList = new ArrayList<Map<String, Object>>(); while (true) { Map<String, Object> takeMap = null; try { takeMap = queue.take(); if (takeMap == null) { continue; } jsonList.add(takeMap); if (jsonList.size() == 100) { String httpJson = HbaseUtil.toHttpJson(vo.getTableName(), jsonList); LOGGER.info(httpJson); List<HbaseDataEntity> hbaseDatas =ParseJson.getData(httpJson); HbaseAPI.insertDataList(hbaseDatas); jsonList.clear(); LOGGER.info("hbase存了100條"); } } catch (Exception e) { jsonList.clear(); continue; } } }
- BlockingQueue一定要設置大小,不設置是int最大值,有可能會內存溢出;
- 從BlockingQueue取數據的時候一定要阻塞式取take(),負責會死循環,占CPU100%;
- hbase庫連接時是阻塞式的,如果連接不上會一直阻塞。