大數據架構-使用HBase和Solr將存儲與索引放在不同的機器上
摘要:
HBase可以通過協處理器
Coprocessor
的方式向Solr發出請求,Solr對於接收到的數據可以做相關的同步:增、刪、改索引的操作,這樣就可以同時使用HBase存儲量大和Solr檢索性能高的優點了,更何況HBase和Solr都可以集群。這對海量數據存儲、檢索提供了一種方式,將存儲與索引放在不同的機器上,是大數據
架構的必須品。
關鍵詞:
HBase,
Solr,
Coprocessor
,
大數據
,
架構
正如我的之前的博客“
Solr與HBase架構設計
”http://http://www.cnblogs.com/wgp13x/p/a8bb8ccd469c96917652201007ad3c50.html中所述,HBase和Solr可以通過協處理器
Coprocessor
的方式向Solr發出請求,Solr對於接收到的數據可以做相關的同步:增、刪、改索引的操作。將存儲與索引放在不同的機器上,這是大數據架構的必須品,但目前還有很多不懂得此道的同學,他們對於這種思想感到很新奇,不過,這絕對是好的方向,所以不懂得抓緊學習吧。
有個朋友給我的那篇博客留言,說CDH也可以做這樣的事情,我還沒有試過,他還問我要與此相關的代碼,於是我就稍微整理了一下,作為本篇文章的主要內容。關於CDH的事,我會盡快嘗試,有知道的同學可以給我留言。
下面我主要講述一下,我測試對HBase和Solr的性能時,使用HBase
協處理器向HBase添加數據所編寫的相關代碼,及解釋說明。
一、編寫HBase協處理器Coprocessor
一旦有數據postPut,就立即對Solr里相應的Core更新。這里使用了
ConcurrentUpdateSolrServer,它是Solr速率性能的保證,使用它不要忘記在Solr里面配置autoCommit喲。
/*
*版權:王安琪
*描述:監視HBase,一有數據postPut就向Solr發送,本類要作為觸發器添加到HBase
*修改時間:2014-05-27
*修改內容:新增
*/
package solrHbase.test;
import java.io.UnsupportedEncodingException;
import ***;
public class SorlIndexCoprocessorObserver extends BaseRegionObserver {
private static final Logger LOG = LoggerFactory
.getLogger(SorlIndexCoprocessorObserver.class);
private static final String solrUrl = "http://192.1.11.108:80/solr/core1";
private static final SolrServer solrServer = new ConcurrentUpdateSolrServer(
solrUrl, 10000, 20);
/**
* 建立solr索引
*
* @throws UnsupportedEncodingException
*/
@Override
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final boolean writeToWAL)
throws UnsupportedEncodingException {
inputSolr(put);
}
public void inputSolr(Put put) {
try {
solrServer.add(TestSolrMain.getInputDoc(put));
} catch (Exception ex) {
LOG.error(ex.getMessage());
}
}
}
|
注意:getInputDoc是這個HBase協處理器Coprocessor的精髓所在,它可以把HBase內的Put里的內容轉化成Solr需要的值。其中
String fieldName = key.substring(key.indexOf(
columnFamily
) + 3,
key.indexOf(
"我在這"
)).trim();
這里有一個亂碼字符,在這里看不到,請大家注意一下。
public static SolrInputDocument getInputDoc(Put put) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("test_ID", Bytes.toString(put.getRow()));
for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes(columnFamily))) {
String key = Bytes.toString(c.getKey());
String value = Bytes.toString(c.getValue());
if (value.isEmpty()) {
continue;
}
String fieldName = key.substring(key.indexOf(columnFamily) + 3,
key.indexOf("")).trim();
doc.addField(fieldName, value);
}
return doc;
} |
二、編寫測試程序入口代碼main
這段代碼向HBase請求建了一張表,並將模擬的數據,向HBase連續地提交數據內容,在HBase中不斷地插入數據,同時記錄時間,測試插入性能。
/*
*版權:王安琪
*描述:測試HBaseInsert,HBase插入性能
*修改時間:2014-05-27
*修改內容:新增
*/
package solrHbase.test;
import hbaseInput.HbaseInsert;
import ***;
public class TestHBaseMain {
private static Configuration config;
private static String tableName = "angelHbase";
private static HTable table = null;
private static final String columnFamily = "wanganqi";
/**
* @param args
*/
public static void main(String[] args) {
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "192.103.101.104");
HbaseInsert.createTable(config, tableName, columnFamily);
try {
table = new HTable(config, Bytes.toBytes(tableName));
for (int k = 0; k < 1; k++) {
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 100000; i++) {
HbaseInsert.inputData(table,
PutCreater.createPuts(1000, columnFamily));
Calendar c = Calendar.getInstance();
String dateTime = c.get(Calendar.YEAR) + "-"
+ c.get(Calendar.MONTH) + "-"
+ c.get(Calendar.DATE) + "T"
+ c.get(Calendar.HOUR) + ":"
+ c.get(Calendar.MINUTE) + ":"
+ c.get(Calendar.SECOND) + ":"
+ c.get(Calendar.MILLISECOND) + "Z 寫入: "
+ i * 1000;
System.out.println(dateTime);
}
}
};
t.start();
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
|
下面的是與HBase相關的操作,把它封裝到一個類中,這里就只有建表與插入數據的相關代碼。
/*
*版權:王安琪
*描述:與HBase相關操作,建表與插入數據
*修改時間:2014-05-27
*修改內容:新增
*/
package hbaseInput;
import ***;
import org.apache.hadoop.hbase.client.Put;
public class HbaseInsert {
public static void createTable(Configuration config, String tableName,
String columnFamily) {
HBaseAdmin hBaseAdmin;
try {
hBaseAdmin = new HBaseAdmin(config);
if (hBaseAdmin.tableExists(tableName)) {
return;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
hBaseAdmin.createTable(tableDescriptor);
hBaseAdmin.close();
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void inputData(HTable table, ArrayList<Put> puts) {
try {
table.put(puts);
table.flushCommits();
puts.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
|
三、編寫模擬數據Put
向HBase中寫入數據需要構造Put,下面是我構造模擬數據Put的方式,有字符串的生成,我是由mmseg提供的詞典
words.dic
中隨機讀取一些詞語連接起來,生成一句字符串的,下面的代碼沒有體現,不過很easy,你自己造你自己想要的數據就OK了。
public static Put createPut(String columnFamily) {
String ss = getSentence();
byte
[] family = Bytes.
toBytes
(columnFamily);
byte[] rowKey = Bytes.toBytes("" + Math.abs(r.nextLong()));
Put put = new Put(rowKey);
put.add(family, Bytes.toBytes("DeviceID"),
Bytes.toBytes("" + Math.abs(r.nextInt())));
******
put.add(family, Bytes.toBytes("
Company_mmsegsm
"), Bytes.toBytes("ss"));
return put;
} |
當然在運行上面這個程序之前,需要先在Solr里面配置好你需要的列信息,HBase、Solr安裝與配置,它們的基礎使用方法將會在之后的文章中介紹
。在這里,Solr的列配置就跟你使用createPut生成的Put搞成一樣的列名就行了,當然也可以使用動態列的形式。

四、直接對Solr性能測試
如果你不想對HBase與Solr的相結合進行測試,只想單獨對Solr的性能進行測試,這就更簡單了,完全可以利用上面的代碼段來測試,稍微組裝一下就可以了。
private static void sendConcurrentUpdateSolrServer(final String url,
final int count) throws SolrServerException, IOException {
SolrServer solrServer = new ConcurrentUpdateSolrServer(url, 10000, 20);
for (int i = 0; i < count; i++) {
solrServer.add(getInputDoc(PutCreater.createPut(columnFamily)));
}
} |
希望可以幫助到你規格嚴格-功夫到家。這次的文章代碼又偏多了點,但代碼是解釋思想的最好的語言,我的提倡就是盡可能的減少代碼的注釋,盡力簡化你的代碼,使你的代碼足夠的清晰易懂,甚至於相似於偽代碼了,這也是《重構》這本書里所提倡的。