大數據應用之HBase數據插入性能優化之多線程並行插入測試案例


一、引言:

  上篇文章提起關於HBase插入性能優化設計到的五個參數,從參數配置的角度給大家提供了一個性能測試環境的實驗代碼。根據網友的反饋,基於單線程的模式實現的數據插入畢竟有限。通過個人實測,在我的虛擬機環境下,單線程插入數據的值約為4w/s。集群指標是:CPU雙核1.83,虛擬機512M內存,集群部署單點模式。本文給出了基於多線程並發模式的,測試代碼案例和實測結果,希望能給大家一些啟示:

二、源程序:

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.hbase.HBaseConfiguration;
  3 import java.io.BufferedReader;
  4 import java.io.File;
  5 import java.io.FileNotFoundException;
  6 import java.io.FileReader;
  7 import java.io.IOException;
  8 import java.util.ArrayList;
  9 import java.util.List;
 10 import java.util.Random;
 11 
 12 import org.apache.hadoop.conf.Configuration;
 13 import org.apache.hadoop.hbase.HBaseConfiguration;
 14 import org.apache.hadoop.hbase.client.HBaseAdmin;
 15 import org.apache.hadoop.hbase.client.HTable;
 16 import org.apache.hadoop.hbase.client.HTableInterface;
 17 import org.apache.hadoop.hbase.client.HTablePool;
 18 import org.apache.hadoop.hbase.client.Put;
 19 
 20 public class HBaseImportEx {
 21     static Configuration hbaseConfig = null;
 22     public static HTablePool pool = null;
 23     public static String tableName = "T_TEST_1";
 24     static{
 25          //conf = HBaseConfiguration.create();
 26          Configuration HBASE_CONFIG = new Configuration();
 27          HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");
 28          HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");
 29          HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
 30          hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);
 31          
 32          pool = new HTablePool(hbaseConfig, 1000); 
 33     }
 34     /*
 35      * Insert Test single thread
 36      * */
 37     public static void SingleThreadInsert()throws IOException
 38     {
 39         System.out.println("---------開始SingleThreadInsert測試----------");
 40         long start = System.currentTimeMillis();
 41         //HTableInterface table = null;
 42         HTable table = null;
 43         table = (HTable)pool.getTable(tableName);
 44         table.setAutoFlush(false);
 45         table.setWriteBufferSize(24*1024*1024);
 46         //構造測試數據
 47         List<Put> list = new ArrayList<Put>();
 48         int count = 10000;
 49         byte[] buffer = new byte[350];
 50         Random rand = new Random();
 51         for(int i=0;i<count;i++)
 52         {
 53             Put put = new Put(String.format("row %d",i).getBytes());
 54             rand.nextBytes(buffer);
 55             put.add("f1".getBytes(), null, buffer);
 56             //wal=false
 57             put.setWriteToWAL(false);
 58             list.add(put);    
 59             if(i%10000 == 0)
 60             {
 61                 table.put(list);
 62                 list.clear();    
 63                 table.flushCommits();
 64             }            
 65         }
 66         long stop = System.currentTimeMillis();
 67         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
 68           
 69         System.out.println("插入數據:"+count+"共耗時:"+ (stop - start)*1.0/1000+"s");
 70         
 71         System.out.println("---------結束SingleThreadInsert測試----------");
 72     }
 73     /*
 74      * 多線程環境下線程插入函數 
 75      * 
 76      * */
 77     public static void InsertProcess()throws IOException
 78     {
 79         long start = System.currentTimeMillis();
 80         //HTableInterface table = null;
 81         HTable table = null;
 82         table = (HTable)pool.getTable(tableName);
 83         table.setAutoFlush(false);
 84         table.setWriteBufferSize(24*1024*1024);
 85         //構造測試數據
 86         List<Put> list = new ArrayList<Put>();
 87         int count = 10000;
 88         byte[] buffer = new byte[256];
 89         Random rand = new Random();
 90         for(int i=0;i<count;i++)
 91         {
 92             Put put = new Put(String.format("row %d",i).getBytes());
 93             rand.nextBytes(buffer);
 94             put.add("f1".getBytes(), null, buffer);
 95             //wal=false
 96             put.setWriteToWAL(false);
 97             list.add(put);    
 98             if(i%10000 == 0)
 99             {
100                 table.put(list);
101                 list.clear();    
102                 table.flushCommits();
103             }            
104         }
105         long stop = System.currentTimeMillis();
106         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
107           
108         System.out.println("線程:"+Thread.currentThread().getId()+"插入數據:"+count+"共耗時:"+ (stop - start)*1.0/1000+"s");
109     }
110     
111     
112     /*
113      * Mutil thread insert test
114      * */
115     public static void MultThreadInsert() throws InterruptedException
116     {
117         System.out.println("---------開始MultThreadInsert測試----------");
118         long start = System.currentTimeMillis();
119         int threadNumber = 10;
120         Thread[] threads=new Thread[threadNumber];
121         for(int i=0;i<threads.length;i++)
122         {
123             threads[i]= new ImportThread();
124             threads[i].start();            
125         }
126         for(int j=0;j< threads.length;j++)
127         {
128              (threads[j]).join();
129         }
130         long stop = System.currentTimeMillis();
131           
132         System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗時:"+ (stop - start)*1.0/1000+"s");        
133         System.out.println("---------結束MultThreadInsert測試----------");
134     }    
135 
136     /**
137      * @param args
138      */
139     public static void main(String[] args)  throws Exception{
140         // TODO Auto-generated method stub
141         //SingleThreadInsert();        
142         MultThreadInsert();
143         
144         
145     }
146     
147     public static class ImportThread extends Thread{
148         public void HandleThread()
149         {                        
150             //this.TableName = "T_TEST_1";
151         
152             
153         }
154         //
155         public void run(){
156             try{
157                 InsertProcess();            
158             }
159             catch(IOException e){
160                 e.printStackTrace();                
161             }finally{
162                 System.gc();
163                 }
164             }            
165         }
166 
167 }

三、說明

1.線程數設置需要根據本集群硬件參數,實際測試得出。否則線程過多的情況下,總耗時反而是下降的。

2.單筆提交數對性能的影響非常明顯,需要在自己的環境下,找到最理想的數值,這個需要與單條記錄的字節數相關。

四、測試結果

---------開始MultThreadInsert測試----------

線程:8插入數據:10000共耗時:1.328s
線程:16插入數據:10000共耗時:1.562s
線程:11插入數據:10000共耗時:1.562s
線程:10插入數據:10000共耗時:1.812s
線程:13插入數據:10000共耗時:2.0s
線程:17插入數據:10000共耗時:2.14s
線程:14插入數據:10000共耗時:2.265s
線程:9插入數據:10000共耗時:2.468s
線程:15插入數據:10000共耗時:2.562s
線程:12插入數據:10000共耗時:2.671s
MultThreadInsert:100000共耗時:2.703s
---------結束MultThreadInsert測試----------

 備注:該技術專題討論正在群Hadoop高級交流群:293503507同步直播中,敬請關注。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM