python elasticsearch環境搭建


windows linux環境搭建

windows下載zip
linux下載tar
下載地址:https://www.elastic.co/downloads/elasticsearch

解壓后運行:bin/elasticsearch (or bin\elasticsearch.bat on Windows)
檢查是否成功:訪問 http://localhost:9200

linux下不能以root用戶運行,
普通用戶運行報錯:
java.nio.file.AccessDeniedException

原因:當前用戶沒有執行權限
解決方法: chown linux用戶名 elasticsearch安裝目錄 -R
例如:chown ealsticsearch /data/wwwroot/elasticsearch-6.2.4 -R
PS:其他Java軟件報.AccessDeniedException錯誤也可以同樣方式解決,給 執行用戶相應的目錄權限即可

代碼實例

如下的代碼實現類似鏈家網小區搜索功能。
從文件讀取小區及地址信息寫入es,然后通過小區所在城市code及搜索關鍵字 匹配到對應小區。
代碼主要包含三部分內容:
1.創建索引
2.用bulk將批量數據存儲到es
3.數據搜索
注意:
代碼的es版本交低2.xx版本,高版本在創建的索引數據類型有所不同

#coding:utf8
from __future__ import unicode_literals
import os
import time
import config
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

class ElasticSearch():
    def __init__(self, index_name,index_type,ip ="127.0.0.1"):
        '''
        :param index_name: 索引名稱
        :param index_type: 索引類型
        '''
        self.index_name =index_name
        self.index_type = index_type
        # 無用戶名密碼狀態
        #self.es = Elasticsearch([ip])
        #用戶名密碼狀態
        self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
    def create_index(self,index_name="ftech360",index_type="community"):
        '''
        創建索引,創建索引名稱為ott,類型為ott_type的索引
        :param ex: Elasticsearch對象
        :return:
        '''
        #創建映射
        _index_mappings = {
            "mappings": {
                self.index_type: {
                    "properties": {
                        "city_code": {
                            "type": "string",
                            # "index": "not_analyzed"
                        },
                        "name": {
                            "type": "string",
                            # "index": "not_analyzed"
                        },
                        "address": {
                            "type": "string",
                            # "index": "not_analyzed"
                        }
                    }
                }

            }
        }
        if self.es.indices.exists(index=self.index_name) is True:
            self.es.indices.delete(index=self.index_name)
        res = self.es.indices.create(index=self.index_name, body=_index_mappings)
        print res

    def build_data_dict(self):
        name_dict = {}
        with open(os.path.join(config.datamining_dir,'data_output','house_community.dat')) as f:
            for line in f:
                line_list = line.decode('utf-8').split('\t')
                community_code = line_list[6]
                name = line_list[7]
                city_code = line_list[0]
                name_dict[community_code] = (name,city_code)

        address_dict = {}
        with open(os.path.join(config.datamining_dir,'data_output','house_community_detail.dat')) as f:
            for line in f:
                line_list = line.decode('utf-8').split('\t')
                community_code = line_list[6]
                address = line_list[10]
                address_dict[community_code] = address

        return name_dict,address_dict

    def bulk_index_data(self,name_dict,address_dict):
        '''
        用bulk將批量數據存儲到es
        :return:
        '''
        list_data = []
        for community_code, data in name_dict.items():
            tmp = {}
            tmp['code'] = community_code
            tmp['name'] = data[0]
            tmp['city_code'] = data[1]
            
            if community_code in address_dict:
                tmp['address'] = address_dict[community_code]
            else:
                tmp['address'] = ''

            list_data.append(tmp)
        ACTIONS = []
        for line in list_data:
            action = {
                "_index": self.index_name,
                "_type": self.index_type,
                "_id": line['code'], #_id 小區code
                "_source": {
                    "city_code": line['city_code'],
                    "name": line['name'],
                    "address": line['address']
                    }
            }
            ACTIONS.append(action)
            # 批量處理
        success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
        #單條寫入 單條寫入速度很慢
        #self.es.index(index=self.index_name,doc_type="doc_type_test",body = action)

        print('Performed %d actions' % success)

    def delete_index_data(self,id):
        '''
        刪除索引中的一條
        :param id:
        :return:
        '''
        res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id)
        print res

    def get_data_id(self,id):
        res = self.es.get(index=self.index_name, doc_type=self.index_type,id=id)
        # # 輸出查詢到的結果
        print res['_source']['city_code'], res['_id'],  res['_source']['name'], res['_source']['address']

    def get_data_by_body(self, name, city_code):
        # doc = {'query': {'match_all': {}}}
        doc = {
            "query": {
                "bool":{
                    "filter":{
                        "term":{
                        "city_code": city_code
                        }
                    },
                    "must":{
                        "multi_match": {
                            "query": name,
                            "type":"phrase_prefix",
                            "fields": ['name^3', 'address'],
                            "slop":1,
                            
                            }

                    }
                }
            }
        }
        _searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc)
        data = _searched['hits']['hits']
        return data
         

if __name__=='__main__':
    #數據插入es
    obj = ElasticSearch("ftech360","community")
    obj.create_index()
    name_dict, address_dict = obj.build_data_dict()
    obj.bulk_index_data(name_dict,address_dict)

    #從es讀取數據
    obj2 = ElasticSearch("ftech360","community")
    obj2.get_data_by_body(u'保利','510100')


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM