一、引言:
上篇文章提起關於HBase插入性能優化設計到的五個參數,從參數配置的角度給大家提供了一個性能測試環境的實驗代碼。根據網友的反饋,基於單線程的模式實現的數據插入畢竟有限。通過個人實測,在我的虛擬機環境下,單線程插入數據的值約為4w/s。集群指標是:CPU雙核1.83,虛擬機512M內存,集群部署單點模式。本文給出了基於多線程並發模式的,測試代碼案例和實測結果,希望能給大家一些啟示:
二、源程序:
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.hbase.HBaseConfiguration; 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileNotFoundException; 6 import java.io.FileReader; 7 import java.io.IOException; 8 import java.util.ArrayList; 9 import java.util.List; 10 import java.util.Random; 11 12 import org.apache.hadoop.conf.Configuration; 13 import org.apache.hadoop.hbase.HBaseConfiguration; 14 import org.apache.hadoop.hbase.client.HBaseAdmin; 15 import org.apache.hadoop.hbase.client.HTable; 16 import org.apache.hadoop.hbase.client.HTableInterface; 17 import org.apache.hadoop.hbase.client.HTablePool; 18 import org.apache.hadoop.hbase.client.Put; 19 20 public class HBaseImportEx { 21 static Configuration hbaseConfig = null; 22 public static HTablePool pool = null; 23 public static String tableName = "T_TEST_1"; 24 static{ 25 //conf = HBaseConfiguration.create(); 26 Configuration HBASE_CONFIG = new Configuration(); 27 HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000"); 28 HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133"); 29 HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181"); 30 hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG); 31 32 pool = new HTablePool(hbaseConfig, 1000); 33 } 34 /* 35 * Insert Test single thread 36 * */ 37 public static void SingleThreadInsert()throws IOException 38 { 39 System.out.println("---------開始SingleThreadInsert測試----------"); 40 long start = System.currentTimeMillis(); 41 //HTableInterface table = null; 42 HTable table = null; 43 table = (HTable)pool.getTable(tableName); 44 table.setAutoFlush(false); 45 table.setWriteBufferSize(24*1024*1024); 46 //構造測試數據 47 List<Put> list = new ArrayList<Put>(); 48 int count = 10000; 49 byte[] buffer = new byte[350]; 50 Random rand = new Random(); 51 for(int i=0;i<count;i++) 52 { 53 Put put = new Put(String.format("row %d",i).getBytes()); 54 rand.nextBytes(buffer); 55 put.add("f1".getBytes(), null, buffer); 56 //wal=false 57 put.setWriteToWAL(false); 58 list.add(put); 59 if(i%10000 == 0) 60 { 61 table.put(list); 62 list.clear(); 63 table.flushCommits(); 64 } 65 } 66 long stop = System.currentTimeMillis(); 67 //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count); 68 69 System.out.println("插入數據:"+count+"共耗時:"+ (stop - start)*1.0/1000+"s"); 70 71 System.out.println("---------結束SingleThreadInsert測試----------"); 72 } 73 /* 74 * 多線程環境下線程插入函數 75 * 76 * */ 77 public static void InsertProcess()throws IOException 78 { 79 long start = System.currentTimeMillis(); 80 //HTableInterface table = null; 81 HTable table = null; 82 table = (HTable)pool.getTable(tableName); 83 table.setAutoFlush(false); 84 table.setWriteBufferSize(24*1024*1024); 85 //構造測試數據 86 List<Put> list = new ArrayList<Put>(); 87 int count = 10000; 88 byte[] buffer = new byte[256]; 89 Random rand = new Random(); 90 for(int i=0;i<count;i++) 91 { 92 Put put = new Put(String.format("row %d",i).getBytes()); 93 rand.nextBytes(buffer); 94 put.add("f1".getBytes(), null, buffer); 95 //wal=false 96 put.setWriteToWAL(false); 97 list.add(put); 98 if(i%10000 == 0) 99 { 100 table.put(list); 101 list.clear(); 102 table.flushCommits(); 103 } 104 } 105 long stop = System.currentTimeMillis(); 106 //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count); 107 108 System.out.println("線程:"+Thread.currentThread().getId()+"插入數據:"+count+"共耗時:"+ (stop - start)*1.0/1000+"s"); 109 } 110 111 112 /* 113 * Mutil thread insert test 114 * */ 115 public static void MultThreadInsert() throws InterruptedException 116 { 117 System.out.println("---------開始MultThreadInsert測試----------"); 118 long start = System.currentTimeMillis(); 119 int threadNumber = 10; 120 Thread[] threads=new Thread[threadNumber]; 121 for(int i=0;i<threads.length;i++) 122 { 123 threads[i]= new ImportThread(); 124 threads[i].start(); 125 } 126 for(int j=0;j< threads.length;j++) 127 { 128 (threads[j]).join(); 129 } 130 long stop = System.currentTimeMillis(); 131 132 System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗時:"+ (stop - start)*1.0/1000+"s"); 133 System.out.println("---------結束MultThreadInsert測試----------"); 134 } 135 136 /** 137 * @param args 138 */ 139 public static void main(String[] args) throws Exception{ 140 // TODO Auto-generated method stub 141 //SingleThreadInsert(); 142 MultThreadInsert(); 143 144 145 } 146 147 public static class ImportThread extends Thread{ 148 public void HandleThread() 149 { 150 //this.TableName = "T_TEST_1"; 151 152 153 } 154 // 155 public void run(){ 156 try{ 157 InsertProcess(); 158 } 159 catch(IOException e){ 160 e.printStackTrace(); 161 }finally{ 162 System.gc(); 163 } 164 } 165 } 166 167 }
三、說明
1.線程數設置需要根據本集群硬件參數,實際測試得出。否則線程過多的情況下,總耗時反而是下降的。
2.單筆提交數對性能的影響非常明顯,需要在自己的環境下,找到最理想的數值,這個需要與單條記錄的字節數相關。
四、測試結果
---------開始MultThreadInsert測試----------
線程:8插入數據:10000共耗時:1.328s
線程:16插入數據:10000共耗時:1.562s
線程:11插入數據:10000共耗時:1.562s
線程:10插入數據:10000共耗時:1.812s
線程:13插入數據:10000共耗時:2.0s
線程:17插入數據:10000共耗時:2.14s
線程:14插入數據:10000共耗時:2.265s
線程:9插入數據:10000共耗時:2.468s
線程:15插入數據:10000共耗時:2.562s
線程:12插入數據:10000共耗時:2.671s
MultThreadInsert:100000共耗時:2.703s
---------結束MultThreadInsert測試----------
備注:該技術專題討論正在群Hadoop高級交流群:293503507同步直播中,敬請關注。