雲計算平台(檢索篇)-Elasticsearch-索引篇


Es索引的我們可以理解為數據入庫的一個過程。我們知道Es是基於Lucene框架的一個分布式檢索平台。索引的同樣也是基於Lucene創建的,只不過在其上層做了一些封閉。

         Es的索引過程比較通用的大體上有兩種方式,其一是得用自身Rvier從數據庫中拉數據,當然現在已經有了很多相關插件,Mysql、MDB等數據庫。這種方式可以做到近時實索引,因為River是定時從數據庫拉數據與索引數據進行比對。這種方式經較適合數據有周期的更新。

         下面以Mysql-River  plugins為例:

1、    安裝Mysql-River 插件

bin/plugin -install /path/to/plugin/river-mysql.zip

2、    當安裝好Mysql-River plugin 后,一般可以馬上使用,但建立重新加載Es集群。查看log中是否正確的加載了Mysql-River Plugin(在后面我們講到如何開發相關Plugin)。

3、    配置Es索引與Mysql 數據之間的對應關系。

建立索引(相關Mapping 信息如下:)

curl -XPUT 127.0.0.1:9200/elasticsearchindexname/elasticsearchtypename/_mapping -d

"elasticsearchtypename" : {

                   "_timestamp":{

                            "enabled":true

                   }

}

                   將River索引的配置也提交到Es集群中:

                   curl -XPUT 127.0.0.1:9200/_river/river-mysql/_meta –d

                   {

             "type":"mysql",

                "mysql":{

        "index":"elasticsearchindexname",(索引名稱)

        "type":"elasticsearchtypename",(類型)

        "hostname":"127.0.0.1:3306",(服務器)

        "database":"ESDATA",(數據庫名稱)

        "username":"root",(用戶名)

        "password":"",(密碼)

        "uniqueIdField":"_ID",(標識)

        "query":"select RID,PNAME FROM wf_mds_chn_biaozhun",(SQL語句)

        "deleteOldEntries":"false",

        "interval":"60000"(更新周期)

    }

}

同時你會在Es看到你的索引中開始導數據了,當然些時也會出現一個對應的保存配置的索引,現在很多River都只能索引字段與數據庫的字段一一對應。如果需要個性化定制,可以到Github上下載相關代碼進行修改。我們可以看到只要繼續River(接口)和AbstractRiverComponent(類)便可以進行相關開發了。

public class MysqlRiver extends AbstractRiverComponent implements River

 

         另外一種索引方式當然就是我們把數據Put到Es中去了,最簡單的我們可以用下面命令就完成:

$ curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{

    "user" : "kimchy",

    "post_date" : "2009-11-15T14:12:12",

    "message" : "trying out Elastic Search"

}'

對上面的命令解釋一下:

Twitter:索引名稱

Tweet:類型名稱

1:ID值

具體我會在下篇中講解索引名稱和類型的關系,當然-d 后面的就是值了。這是單條Put數據的方式雖然簡單,但是如果說數據量很大的情況下還是不建議用這種方式,可以改用批量導入的方式也就是傳說中的Bluk了,Bluk原量很簡單,我們把數據放到緩存中,一次Put多條數據到Es集群中去,Bluk當然要用代碼實現了,給出一個例子如下:

public static void Index() throws ElasticSearchException, IOException, NumberFormatException, SQLException {

                   // TODO Auto-generated method stub

                   // Node node = nodeBuilder().client(true).node();

                   Settings settings = ImmutableSettings.settingsBuilder()

                                     .put("cluster.name", "elasticsearch_wf").build();

                   Client client = new TransportClient(settings)

                                     .addTransportAddress(new InetSocketTransportAddress(

                                                        "168.160.200.250", 9300));

                  

                   ////������е��ܼ�¼��ֳ�5000��һ�����ѯ

                   int countRe=100000000; //MySqlClass.getCount("select count(*) from test");

                   if(countRe>0)

                   {

                            int readercount=1;

                            if(countRe>5000)

                            {

                                     readercount=countRe%5000==0?countRe/5000:countRe/5000+1;

                            }

                           

                            ////ÿ�ζ�ȡ5000���¼

                            for(int j=0;j<readercount;j++)

                            {

                                     ResultSet rs = MySqlClass.executeQuery("select * from test");

                                     BulkRequestBuilder bulkRequest = client.prepareBulk();

                                     try {

 

                                               if (rs != null) {

                                                        int i = 1;

                                                        while (rs.next()) {

                                                                 bulkRequest.add(client.prepareIndex("qtest", String.valueOf(i++)).setSource(

                                                                                    jsonBuilder().startObject()

                                                                                                       .field("id", rs.getInt("id"))

                                                                                                       .field("�й�", rs.getString("title"))

                                                                                                       .field("AB_EN", rs.getString("descript"))

                                                                                                       .field("AF_CN",rs.getString("text"))

                                                                                                       .endObject()));

                                                        }

                                                        BulkResponse bulkResponse = bulkRequest.execute().actionGet();

                                                        if (bulkResponse.hasFailures()) {

                                                                 /* has Failures handler Error */

                                                        }

                                               }

                                     } catch (Exception e) {

                                               e.printStackTrace();

                                     }

                            }

                   }

                   client.close();

         }

上面只是一個簡單的例子,大量可以考慮用從線程方式,另外Client鏈接數其實還是比較占資源的,大家可以考慮將出封閉到一個鏈接池中,提供效率。

         整個建索引的過程Es在Lucene的基礎上還是做了很多的優化,但主體上我們對應到Lucene里面基實就是如下代碼:

         public class Index {

         private IndexWriter writer = null;

         private static Analyzer ANALYZER = new IKAnalyzer();

         private String FilePath = null;

 

         public Index(String FilePath, String IndexPath) {

                   try {

                            IndexWriterConfig writerConfig = new IndexWriterConfig(

                                               Version.LUCENE_36, ANALYZER);

                            this.writer = new IndexWriter(

                                               FSDirectory.open(new File(IndexPath)), writerConfig);

                            this.FilePath = FilePath;

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         /*

          * Init Create Index

          */

         public void Init() {

                   try {

                            if (FilePath.length() > 0) {

                                     // 讀目錄中txt文件

                                     File file = new File(FilePath);

                                     List<File> files = new ArrayList<File>();

                                     this.ListAllFile(file, files);

 

                                     // //將File轉換為 Document對象

                                     for (File sfs : files) {

                                               this.writer.addDocument(this.getDocument(sfs));

                                     }

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         /*

          * Close Index

          */

         public void Close() {

                   try {

                            this.writer.commit();

                            this.writer.close();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         /*

          * 獲取所有txt文件

          */

         private List<File> ListAllFile(File fileOrDir, List<File> files)

                            throws Exception {

                   if (fileOrDir != null && files != null) {

                            if (fileOrDir.isDirectory()) {

                                     File[] fs = fileOrDir.listFiles();

                                     for (File sfs : fs) {

                                               if (sfs.isDirectory())

                                                        this.ListAllFile(sfs, files);

                                               else files.add(sfs);

                                     }

                            } else {

                                     files.add(fileOrDir);

                            }

                   }

                   return null;

         }

 

         /*

          * Get Document

          */

         private Document getDocument(File f) throws Exception {

                   Document doc = new Document();

                   FileInputStream  is = new FileInputStream(f);

                   byte[] buf = new byte[is.available()];

                   is.read(buf);

                   String contentStr = new String(buf,"GBK");

                   Field content = new Field("content", contentStr, Field.Store.YES,

                                     Field.Index.ANALYZED);

                   doc.add(content);

                   Field path = new Field("path", f.getAbsolutePath(), Field.Store.YES,

                                     Field.Index.ANALYZED);

                   Field size=new Field("size",String.valueOf(f.getTotalSpace()),Field.Store.YES,Field.Index.NOT_ANALYZED);

                   doc.add(size);

                   Random rm=new Random();

                   int year=rm.nextInt(20);

                   Field time=new Field("time",String.valueOf(1990+year),Field.Store.YES,Field.Index.NOT_ANALYZED);

                  doc.add(time);

                   doc.add(path);

                   is.close();

                   return doc;

         }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM