用elasticsearch索引mongodb數據


參照網頁:單機搭建elasticsearch和mongodb的river

三個步驟:

一,搭建單機replicSet
二,安裝mongodb-river插件
三,創建meta,驗證使用

第一步,搭建單機mongodb的replSet

1,配置/etc/mongodb.conf
增加兩個配置:

replSet=rs0 #這里是指定replSet的名字 
oplogSize=100 #這里是指定oplog表數據大小(太大了不支持)

啟動mongodb:bin/mongod --fork --logpath /data/db/mongodb.log -f /etc/mongodb.conf

2,初始化replicSet

root# bin/mongo
>rs.initiate( {"_id" : "rs0", "version" : 1, "members" : [ { "_id" : 0, "host" : "127.0.0.1:27017" } ]}) 

3,搭建好replicSet之后,退出mongo shell重新登錄,提示符會變成:

rs0:PRIMARY>

第二步, 安裝mongodb-river插件

插件項目:https://github.com/richardwilly98/elasticsearch-river-mongodb
安裝插件命令:

bin/plugin --install com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb/2.0.0

完畢后啟動elasticsearch,正常會顯示如下提示信息:

root# bin/elasticsearch

...
[2014-03-14 19:28:34,179][INFO ][plugins] [Super Rabbit] loaded [mongodb-river], sites [river-mongodb]
[2014-03-14 19:28:41,032][INFO ][org.elasticsearch.river.mongodb.MongoDBRiver] Starting river mongodb_test
[2014-03-14 19:28:41,087][INFO ][org.elasticsearch.river.mongodb.MongoDBRiver] MongoDB River Plugin - version[2.0.0] - hash[a0c23f1] - time[2014-02-23T20:40:05Z]
[2014-03-14 19:28:41,087][INFO ][org.elasticsearch.river.mongodb.MongoDBRiver] starting mongodb stream. options: secondaryreadpreference [false], drop_collection [false], include_collection [], throttlesize [5000], gridfs [false], filter [null], db [test], collection [page], script [null], indexing to [test]/[page]
[2014-03-14 19:28:41,303][INFO ][org.elasticsearch.river.mongodb.MongoDBRiver] MongoDB version - 2.2.7

第三步,創建meta信息

1,創建mongodb連接

root# curl -XPUT "localhost:9200/_river/mongodb_mytest/_meta" -d ' 
> {
> "type": "mongodb", 
> "mongodb": { 
> "host": "localhost", 
> "port": "27017", 
> "db": "testdb", 
> "collection": "testcollection" 
> }, 
> "index": { 
> "name": "testdbindex", 
> "type": "testcollection"} }'
{"_index":"_river","_type":"mongodb_mytest","_id":"_meta","_version":1,"created":true}'
返回created為true,表示創建成功,也可通過curl "http://localhost:9200/_river/mongodb_mytest/_meta"查看

主要分為三個部分:

type:river的類型,也就是“mongodb”
mongodb:mongodb的連接信息
index:elastisearch中用於接收mongodb數據的索引index和“type”。

其中mongodb_mytest為${es.river.name},每個索引名稱都不一樣,如果重復插入會導致索引被覆蓋的問題。

2,往mongodb插入數據

rs0:PRIMARY> db.testcollection.save({name:"stone"})

3,自定義查詢

root# curl -XGET 'http://localhost:9200/testdbindex/_search?q=name:stone'
{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":0.30685282,"hits":[{"_index":"testdbindex","_type":"testcollection","_id":"5322eb23fdfc233ffcfa02bb","_score":0.30685282, "_source" : {"_id":"5322eb23fdfc233ffcfa02bb","name":"stone"}}]}}

一個問題(我這邊測試不存在這個問題,創建meta后之前mongodb中已存在的數據也會被索引,不過還是把原作者的解決方案放在下面吧)

"在river建立之后的數據變動會體現在elasticsearh里,但是river建立前的數據變動因為沒有在oplog表里,不能被同步。解決方案是,遍歷一次需要導出的表,重新插入到另外一個表里,然后將river指定到這個新表,這樣新表的變動就可以全部體現在oplog里了。"

遍歷mongodb的表可以通過cursor來實現:

var myCursor = db.oldcollection.find( { }, {html:0} ); 
myCursor.forEach(function(myDoc) {db.newcollection.save(myDoc); });

附:mongodb&mongodb-river(elasticsearch)部署

elasticsearch使用示例如下:(index索引 對應 database數據庫,type類型 對應 table數據表)

1,查詢單個索引條目
curl -XGET 'http://localhost:9200/testdbindex/testcollection/532a45ad94af83f0122292cf'
{"_index":"testdbindex","_type":"testcollection","_id":"532a45ad94af83f0122292cf","_version":1,"found":true, "_source" : {"_id":"532a45ad94af83f0122292cf","name":"stone"}}

2,查詢多個索引條目
curl 'localhost:9200/testdbindex/testcollection/_mget' -d '{  
    "ids":["532a40f51d82291684692d1d","532a45ad94af83f0122292cf"]  
}'  

3,搜索指定域(類似關系型數據庫列字段)
curl -XGET 'http://localhost:9200/testdbindex/testcollection/532a40f51d82291684692d1d?fields=title'

4,搜索
curl -XGET 'http://localhost:9200/testdbindex/testcollection/_search' -d '{  
    "query":{  
        "term" : {"name":"penjin"}  
    }  
}'

5,在所有type類型里面搜索name=stone
curl -XGET 'http://localhost:9200/testdbindex/_search?q=name:stone'

6,在指定type為testcollection里面搜索
curl -XGET 'http://localhost:9200/testdbindex/testcollection/_search?q=name:stone'

7
查找count數目
curl -XGET 'http://localhost:9200/testdbindex/testcollection/_count?q=name:stone'
curl -XGET 'http://localhost:9200/testdbindex/_count?q=name:stone'

curl -XGET 'http://localhost:9200/testdbindex/blogs/_count' -d '
{
    "query" : {    
        "term" : { "name" : "stone" }
    }
}'

8,復雜查詢
/**
* 1,指定查詢起始及數目
* 2,指定排序
* 3,查詢指定域
* 4,查詢條件
*/
curl -XGET 'http://localhost:9200/testdbindex/blogs/_search' -d '
{
    "from" : 0, "size" : 10,
    "sort" : [
        { "name" : "desc" }
    ],
    "fields" : ["name"],
    "query" : {    
        "term" : { "name" : "stone" }
    }
}'
/**
* 依賴分詞
*/
curl -XGET 'http://localhost:9200/testdbindex/blogs/_search' -d '
{
    "query" : {    
        "match" : {
            _all : "stone"
        }
    }
}'
/**
* 類似數據庫like語句
*/
curl -XGET 'http://localhost:9200/testdbindex/blogs/_search' -d '
{
    "query" : {    
        "fuzzy_like_this" : {
            "fields" : ["name"],
            "like_text" : "ston",
            "max_query_terms" : 12
        }
    }
}'

9,更多高級查詢參照elasticsearch官方頁面

如果索引數據多了,elasticsearch的data目錄會很大,如果不得不清理磁盤的話,刪除索引即可。一般情況需要擴容磁盤。

root# curl -XDELETE 'http://localhost:9200/testdbindex'
root# curl -XDELETE 'http://localhost:9200/_river' (這行不需要)
{"acknowledged":true}

java語言使用jar包查詢等操作也很方便(依賴elasticsearch.jar與lucene-core.jar包,es的安裝包解壓后lib目錄下有)

package com.ciaos;

import java.util.Iterator;
import java.util.Map.Entry;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;

public class EsDemo {

    private static TransportClient client = null;

    public static void GetConnection(){
        client = new TransportClient().addTransportAddress(new InetSocketTransportAddress(
                "127.0.0.1", 9300));
    }

    public static void searchIndex() {

        QueryBuilder qb = QueryBuilders.termQuery("name", "stone");

        SearchResponse scrollResp = client.prepareSearch("testdbindex")
                        .setSearchType(SearchType.SCAN)
                        .setScroll(new TimeValue(60000))
                        .setQuery(qb.buildAsBytes())
                        .setSize(100).execute().actionGet();
        while (true) {
            scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
            boolean hitsRead = false;
            for (SearchHit hit : scrollResp.getHits()) {
                hitsRead = true;
                Iterator<Entry<String, Object>> rpItor = hit.getSource().entrySet().iterator();
                while (rpItor.hasNext()) {
                     Entry<String, Object> rpEnt = rpItor.next();
                     System.out.println(rpEnt.getKey() + " : " + rpEnt.getValue());
                }
            }
            if (!hitsRead) {
                break;
            }
        }
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        GetConnection();
        searchIndex();
        
        client.close();
    }
}

運行結果如下:

_id : 532a49e294af83f0122292d3
name : stone
_id : 532a45ad94af83f0122292cf
name : stone


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM