Es索引的我們可以理解為數據入庫的一個過程。我們知道Es是基於Lucene框架的一個分布式檢索平台。索引的同樣也是基於Lucene創建的,只不過在其上層做了一些封閉。
Es的索引過程比較通用的大體上有兩種方式,其一是得用自身Rvier從數據庫中拉數據,當然現在已經有了很多相關插件,Mysql、MDB等數據庫。這種方式可以做到近時實索引,因為River是定時從數據庫拉數據與索引數據進行比對。這種方式經較適合數據有周期的更新。
下面以Mysql-River plugins為例:
1、 安裝Mysql-River 插件
bin/plugin -install /path/to/plugin/river-mysql.zip
2、 當安裝好Mysql-River plugin 后,一般可以馬上使用,但建立重新加載Es集群。查看log中是否正確的加載了Mysql-River Plugin(在后面我們講到如何開發相關Plugin)。
3、 配置Es索引與Mysql 數據之間的對應關系。
建立索引(相關Mapping 信息如下:)
curl -XPUT 127.0.0.1:9200/elasticsearchindexname/elasticsearchtypename/_mapping -d
"elasticsearchtypename" : {
"_timestamp":{
"enabled":true
}
}
將River索引的配置也提交到Es集群中:
curl -XPUT 127.0.0.1:9200/_river/river-mysql/_meta –d
{
"type":"mysql",
"mysql":{
"index":"elasticsearchindexname",(索引名稱)
"type":"elasticsearchtypename",(類型)
"hostname":"127.0.0.1:3306",(服務器)
"database":"ESDATA",(數據庫名稱)
"username":"root",(用戶名)
"password":"",(密碼)
"uniqueIdField":"_ID",(標識)
"query":"select RID,PNAME FROM wf_mds_chn_biaozhun",(SQL語句)
"deleteOldEntries":"false",
"interval":"60000"(更新周期)
}
}
同時你會在Es看到你的索引中開始導數據了,當然些時也會出現一個對應的保存配置的索引,現在很多River都只能索引字段與數據庫的字段一一對應。如果需要個性化定制,可以到Github上下載相關代碼進行修改。我們可以看到只要繼續River(接口)和AbstractRiverComponent(類)便可以進行相關開發了。
public class MysqlRiver extends AbstractRiverComponent implements River
另外一種索引方式當然就是我們把數據Put到Es中去了,最簡單的我們可以用下面命令就完成:
$ curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elastic Search"
}'
對上面的命令解釋一下:
Twitter:索引名稱
Tweet:類型名稱
1:ID值
具體我會在下篇中講解索引名稱和類型的關系,當然-d 后面的就是值了。這是單條Put數據的方式雖然簡單,但是如果說數據量很大的情況下還是不建議用這種方式,可以改用批量導入的方式也就是傳說中的Bluk了,Bluk原量很簡單,我們把數據放到緩存中,一次Put多條數據到Es集群中去,Bluk當然要用代碼實現了,給出一個例子如下:
public static void Index() throws ElasticSearchException, IOException, NumberFormatException, SQLException {
// TODO Auto-generated method stub
// Node node = nodeBuilder().client(true).node();
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", "elasticsearch_wf").build();
Client client = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(
"168.160.200.250", 9300));
////������е��ܼ�¼��ֳ�5000��һ�����ѯ
int countRe=100000000; //MySqlClass.getCount("select count(*) from test");
if(countRe>0)
{
int readercount=1;
if(countRe>5000)
{
readercount=countRe%5000==0?countRe/5000:countRe/5000+1;
}
////ÿ�ζ�ȡ5000���¼
for(int j=0;j<readercount;j++)
{
ResultSet rs = MySqlClass.executeQuery("select * from test");
BulkRequestBuilder bulkRequest = client.prepareBulk();
try {
if (rs != null) {
int i = 1;
while (rs.next()) {
bulkRequest.add(client.prepareIndex("qtest", String.valueOf(i++)).setSource(
jsonBuilder().startObject()
.field("id", rs.getInt("id"))
.field("�й�", rs.getString("title"))
.field("AB_EN", rs.getString("descript"))
.field("AF_CN",rs.getString("text"))
.endObject()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
/* has Failures handler Error */
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
client.close();
}
上面只是一個簡單的例子,大量可以考慮用從線程方式,另外Client鏈接數其實還是比較占資源的,大家可以考慮將出封閉到一個鏈接池中,提供效率。
整個建索引的過程Es在Lucene的基礎上還是做了很多的優化,但主體上我們對應到Lucene里面基實就是如下代碼:
public class Index {
private IndexWriter writer = null;
private static Analyzer ANALYZER = new IKAnalyzer();
private String FilePath = null;
public Index(String FilePath, String IndexPath) {
try {
IndexWriterConfig writerConfig = new IndexWriterConfig(
Version.LUCENE_36, ANALYZER);
this.writer = new IndexWriter(
FSDirectory.open(new File(IndexPath)), writerConfig);
this.FilePath = FilePath;
} catch (Exception e) {
e.printStackTrace();
}
}
/*
* Init Create Index
*/
public void Init() {
try {
if (FilePath.length() > 0) {
// 讀目錄中txt文件
File file = new File(FilePath);
List<File> files = new ArrayList<File>();
this.ListAllFile(file, files);
// //將File轉換為 Document對象
for (File sfs : files) {
this.writer.addDocument(this.getDocument(sfs));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*
* Close Index
*/
public void Close() {
try {
this.writer.commit();
this.writer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/*
* 獲取所有txt文件
*/
private List<File> ListAllFile(File fileOrDir, List<File> files)
throws Exception {
if (fileOrDir != null && files != null) {
if (fileOrDir.isDirectory()) {
File[] fs = fileOrDir.listFiles();
for (File sfs : fs) {
if (sfs.isDirectory())
this.ListAllFile(sfs, files);
else files.add(sfs);
}
} else {
files.add(fileOrDir);
}
}
return null;
}
/*
* Get Document
*/
private Document getDocument(File f) throws Exception {
Document doc = new Document();
FileInputStream is = new FileInputStream(f);
byte[] buf = new byte[is.available()];
is.read(buf);
String contentStr = new String(buf,"GBK");
Field content = new Field("content", contentStr, Field.Store.YES,
Field.Index.ANALYZED);
doc.add(content);
Field path = new Field("path", f.getAbsolutePath(), Field.Store.YES,
Field.Index.ANALYZED);
Field size=new Field("size",String.valueOf(f.getTotalSpace()),Field.Store.YES,Field.Index.NOT_ANALYZED);
doc.add(size);
Random rm=new Random();
int year=rm.nextInt(20);
Field time=new Field("time",String.valueOf(1990+year),Field.Store.YES,Field.Index.NOT_ANALYZED);
doc.add(time);
doc.add(path);
is.close();
return doc;
}
}