Elasticsearch 集成 django(傳統數據庫)


Elasticsearch 集成 django(傳統數據庫)

介紹:  增加mysql一張數據庫表的查詢速率, 特實現es作為傳統數據庫使用的方案

 

 

1.django-elasticsearch 的使用

一 安裝

pip install django-elasticsearch-dsl
#  會同時安裝 elasticsearch-dsl 和  elasticsearch

 

二 注冊 + 配置

INSTALLED_APPS = [
    ...
    'django_elasticsearch_dsl',
    ...
]

# Es 配置
ELASTICSEARCH_DSL={
    'default': {
        'hosts': 'localhost:9200'
    },
}

 

三 創建documents.py文件,做 orm 與 es 索引的映射

# 注意, 在app 目錄下創建 documents.py 文件   名字一定要對, 不然執行命令時 會找不到索引
from django_elasticsearch_dsl import Document, fields
from django_elasticsearch_dsl.registries import registry
from public_app.models import NcInfo

@registry.register_document
class NcInfoDocument(Document):  #  注意命名規范
     # 自定義索引字段類型  因為要作為mysql數據庫的延伸 , 所以 需要自定義字段為keyword 類型. 否則會被es自動分詞
    pk = fields.IntegerField()  
    base_product_name = fields.KeywordField()
    base_standard_model = fields.KeywordField()
    business_sla_category = fields.KeywordField()
    ds = fields.KeywordField()
    ip = fields.KeywordField()
    ops_azone_id = fields.KeywordField()
    ops_cluster_id = fields.KeywordField()
    ops_region_name = fields.KeywordField()
    business_nc_id = fields.KeywordField()
    business_instance_family = fields.KeywordField()

    class Index:
        name = 'nc_info'
        settings = {
            # 設置最大索引深度(**重要)  分頁查詢時要用到
            'max_result_window': 10000000,
            # 切片個數
            'number_of_shards':8,
            # 保存副本數
            'number_of_replicas':2
        }

    class Django:
        model = NcInfo  # 與此文檔關聯的模型
        # 要在Elasticsearch中建立索引的模型的字段
         #  fields 置空 則會根據上方的對象的屬性進行映射,  可直接寫orm模型類字段名, 會根據orm中的字段類型進行自動選擇文檔字段類型
        fields = []
        # 執行遷移時的 每次從mysql中數據讀取的條數. 
        queryset_pagination = 50000

 

四 執行遷移

python manage.py search_index --rebuild

 

五 查看索引信息

 

 

 

2.ElasticSearch 的搭建

一 安裝JDK環境 

因為ElasticSearch是用Java語言編寫的,所以必須安裝JDK的環境,並且是JDK 1.8以上,具體操作步驟自行百度

安裝完成查看java版本

java -version

 

二 官網下載最新版本

下載地址[https://www.elastic.co/cn/downloads/elasticsearch],選擇相應版本下載即可

 

 

三 下載其他版本

直接點擊https://www.elastic.co/cn/downloads/past-releases#elasticsearch

 

 

三 下載完成,啟動 

解壓文件,切換到解壓文件路徑下,執行

cd elasticsearch-<version> #切換到路徑下
./bin/elasticsearch  #啟動es
#如果你想把 Elasticsearch 作為一個守護進程在后台運行,那么可以在后面添加參數 -d 。

#如果你是在 Windows 上面運行 Elasticseach,你應該運行 bin\elasticsearch.bat 而不是 bin\elasticsearch

 

四 測試啟動是否成功

在瀏覽器輸入以下地址:http://127.0.0.1:9200/

即可看到如下內容:

{
  "name" : "lqzMacBook.local",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "G1DFg-u6QdGFvz8Z-XMZqQ",
  "version" : {
    "number" : "7.5.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "e9ccaed468e2fac2275a3761849cbee64b39519f",
    "build_date" : "2019-11-26T01:06:52.518245Z",
    "build_snapshot" : false,
    "lucene_version" : "8.3.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
 
           

五 關閉es 

#查看進程
ps -ef | grep elastic
#干掉進程
kill -9 2382(進程號)
#以守護進程方式啟動es
elasticsearch -d

 

六 Es 集群配置

node1

# 集群名稱
cluster.name: online-virt-elasticsearch
# 節點名稱
node.name: es-node-1
# 是否可以成為master節點
node.master: true
# 是否允許該節點存儲數據,默認開啟
node.data: true
# 網絡綁定,寫自己的主機
network.host: 0.0.0.0
# 設置對外服務的http端口,默認為9200,為和單機分開,我設置9201
http.port: 9200
# 設置節點間交互的tcp端口,默認是9300,為和單機分開,我設置9301
transport.tcp.port: 9401
# 手動指定可以成為 mater 的所有節點的 name 或者 ip,這些配置將會在第一次選舉中進行計算
cluster.initial_master_nodes: ["es-node-1","es-node-2","es-node-3","es-node-4","es-node-5"]
 ##設置集群自動發現機器ip的集合
discovery.seed_hosts: ["localhost:9401","localhost:9402","localhost:9403", "localhost:9404","localhost:9405"]
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
#允許跨域訪問
http.cors.enabled: true
http.cors.allow-origin: "*"

 

 

 

3.封裝  elasticsearch  集成 drf  (只查詢接口使用)

elasticsearch 自定義包封裝 

  • __init__ .py
from .filterset import EsFilterSet
from .elasticsearch_generic import ElasticSearch
from .pagination import EsStandardPagination
  • elasticsearch_generic.py
# -*- coding: utf-8 -*-
from elasticsearch_dsl.document import IndexMeta
from . import filter_backend

class ElasticSearch(object):
    search_object = None
    es_filter_class = None
    es_paginate_class = None
    es_paginator = None

    def get_search_object(self):
        assert self.search_object is not None, (
                "'%s' should either include a `search_object` attribute, "
                "or override the `get_search_object()` method."
                % self.__class__.__name__
        )
        search_object = self.search_object
        if isinstance(search_object, IndexMeta):
            search_object = search_object.search()
        return search_object

    def filter_search_object(self, search_object):

        if self.es_filter_class:
            return filter_backend.EsFilterBackend().filter_search_object(self.request, search_object, self)
        return search_object

    def paginate_search(self, search_obj, sort=None):
        if self.es_paginate_class:
            self.es_paginator = self.es_paginate_class(search_obj, self.request, sort)
            return self.es_paginator.paginate

    def get_es_paginated_response(self, data):
        """
        Return a paginated style `Response` object for the given output data.
        """
        return self.es_paginator.get_paginated_response(data)
  • filter_backend.py
# -*- coding: utf-8 -*-
class EsFilterBackend(object):
    @staticmethod
    def get_es_filter_class(view, search_object=None):
        es_filter_class = getattr(view, 'es_filter_class', None)

        if es_filter_class:
            filter_model = es_filter_class.Meta.model
            assert issubclass(search_object._model, filter_model), \
                'EsFilterSet model %s does not match search_object model %s' % \
                (filter_model, search_object.model)
            return es_filter_class

    def filter_search_object(self, request, search_object, view):
        es_filter_class = self.get_es_filter_class(view, search_object)

        if es_filter_class:
            return es_filter_class(request.query_params, search_object=search_object, request=request).qs

        return search_object
  • filters.py
# -*- coding: utf-8 -*-
from django_elasticsearch_dsl import fields
from elasticsearch_dsl import Q


class Filter(object):
    field_class = fields.Field

    def __init__(self, field_name=None, lookup_expr='exact', **kwargs):
        self.field_name = field_name
        if field_name is None and 'name' in kwargs:
            self.field_name = kwargs.pop('name')
        self.lookup_expr = lookup_expr

    def filter(self, name, params):
        if not params.get(name):
            return
        if self.lookup_expr == 'term':
            param = params.get(name)
            key = 'term'
            if '&' in param:
                values = param.split('&')
                key = 'terms'
            elif ',' in param:
                values = param.split(',')
                key = 'terms'
            else:
                values = param
            query = Q(key, **{self.field_name: values})
        elif self.lookup_expr == 'gte' or self.lookup_expr == "lte":
            m = {self.lookup_expr: params.get(name)}
            q = {self.field_name: m}
            # query = {'range': q}
            query = Q('range', **q)
        else:
            raise Exception("{} can not set lookup_expr='{}'".format(self.field_name, self.lookup_expr))
        return query


class KeywordField(Filter):
    field_class = fields.KeywordField


class IntegerField(Filter):
    field_class = fields.IntegerField


class DateField(Filter):
    field_class = fields.DateField
  • filterset.py
# -*- coding: utf-8 -*-
import copy
from collections import OrderedDict
from elasticsearch_dsl import Q


class FilterSetOptions(object):
    def __init__(self, options=None):
        self.model = getattr(options, 'model', None)
        self.fields = getattr(options, 'fields', None)
        self.exclude = getattr(options, 'exclude', None)


class FilterSetMetaclass(type):
    def __new__(cls, name, bases, attrs):
        new_class = super(FilterSetMetaclass, cls).__new__(cls, name, bases, attrs)
        new_class._meta = FilterSetOptions(getattr(new_class, 'Meta', None))
        new_class.base_filters = new_class.get_filters()
        return new_class


class BaseFilterSet(object):

    def __init__(self, data=None, search_object=None, request=None):
        self.params = data
        self.search_object = search_object
        self.is_bound = data is not None
        self.request = request
        self.filters = copy.deepcopy(self.base_filters)

    @property
    def qs(self):
        if not hasattr(self, '_eqs'):
            if not self.is_bound:
                self._eqs = self.search_object
                return self._eqs
            _eqs = self.search_object
            must_query = []
            for name, filter_ in self.filters.items():
                query_ = filter_.filter(name, self.params)
                if query_:
                    must_query.append(query_)
            q = Q('bool', must=must_query)
            self._eqs = _eqs.query(q)
        return self._eqs

    @classmethod
    def get_filters(cls):
        filters = OrderedDict()
        if not cls._meta.model:
            return
        fields = cls._meta.fields
        for key in fields:
            assert getattr(cls, key, None), \
                'EsFilterSet filter field %s does not match  %s fields ' % \
                (key, cls.__name__)
            filters.update({key: getattr(cls, key)})
        return filters


class EsFilterSet(BaseFilterSet):
    __metaclass__ = FilterSetMetaclass
    pass
  • pagination.py
# -*- coding: utf-8 -*-

from utils.response import APIResponse


class EsStandardPagination(object):
    page_size = 20
    page_size_query_param = 'per_page'
    page_query_param = "page"  # url參數
    max_page_size = 100

    def __init__(self, search_object, request, sort=None):
        self.search_object = search_object
        self.params = request.query_params
        self.page = 1
        self.count = search_object.count()
        self.total_page = 0
        self.sort = {'pk': {'order': 'asc'}} if not sort else sort

    @property
    def paginate(self):
        self.page_size = int(self.params.get(self.page_size_query_param) if self.params.get(
            self.page_size_query_param) else self.page_size)
        self.page = int(self.params.get(self.page_query_param) if self.params.get(self.page_query_param) else 1)
        start = (self.page - 1) * self.page_size
        end = start + self.page_size
        s = self.search_object.sort(self.sort)
        self.total_page = (self.count // self.page_size) + 1
        queryset = s[start:end].to_queryset()
        return queryset

    def get_paginated_response(self, data):
        return APIResponse(data={
            "count": self.count,
            "page": self.page,
            "per_page": self.page_size,
            "total_page": self.total_page,
            # 'next': self.get_next_link(),
            # 'previous': self.get_previous_link(),
            "results": data
        })

 

 

 

4.封裝包的使用, 快速寫接口

視圖 

# 導入自定義的 ElasticSearch 和 EsStandardPagination
from utils.elasticsearch import ElasticSearch, EsStandardPagination

# 繼承  GenericAPIView 和 ElasticSearch
class NcInfoShowEsView(GenericAPIView, ElasticSearch):
    # 獲取 search_object, 此為 documents 定義的文檔類. 
    search_object = documents.NcInfoDocument.search()
    # 搜索類
    es_filter_class = es_filter.NcInfoFilter
    # 分頁類
    es_paginate_class = EsStandardPagination

    def get(self, request):
        s = self.filter_search_object(self.get_search_object())
        # 分頁組件會把分頁后的es對象 轉換成 queryset 對象. paginate_search 有兩個參數, 一是查詢后的 search對象, 第二個是排序參數, 默認是 {'pk': {'order': 'asc'}} pk為從orm 獲取的 id,或 pk . 
        page = self.paginate_search(s)
        if page is not None:
            # 視圖中 分頁組件會把分頁后的es對象 轉換成 queryset 對象.  可以使用序列化類進行序列化. 完成
            serializer = serializers.NcInfoSerializer(page, many=True)
            return self.get_es_paginated_response(serializer.data)
        serializer = serializers.NcInfoSerializer(page, many=True)
        return APIResponse(data=serializer.data)
 
           

過濾組件

from . import models
from utils.elasticsearch import EsFilterSet
from utils.elasticsearch import filters

# 繼承自定義esFilterSet
class NcInfoFilter(EsFilterSet):
    # 前端攜帶參數來的key  對應的文檔字段類型
    # field_name:  對應的es索引 文檔中的字段
       # lookup_expr: 過濾方式, 目前有 term方式 為精准匹配.  lte, gte  大於小於.
    base_product_name = filters.KeywordField(field_name='base_product_name', lookup_expr='term')
    business_sla_category = filters.KeywordField(field_name='business_sla_category', lookup_expr='term')
    ds = filters.KeywordField(field_name='ds', lookup_expr='term')

    class Meta:
        # 過濾所對應的orm表
        model = models.NcInfo
        # 參與過濾的字段, 需要在此注冊, 否則不參與過濾. 
        fields = ['base_product_name', 'business_sla_category', 'ds', 'business_instance_family', 'ops_cluster_id',
                  'ops_region_name', 'ops_azone_id','base_standard_model']
 
           

聚合查詢

#  聚合查詢 elasticsearch  沒有提供很好的接口.  目前使用 collapse 進行聚合查詢
class NcInfoOptionsView(GenericAPIView, ElasticSearch):
    """
    nc_info數據展示列表頁篩選數據
    """
    search_object = documents.NcInfoDocument.search()
    es_filter_class = es_filter.NcInfoFilter

    def get(self, request, *args, **kwargs):
        params_map = {'standard_model_list': 'base_standard_model', 'region_name_list': 'ops_region_name',
                      'azone_id_list': 'ops_azone_id', 'cluster_id_list': 'ops_cluster_id',
                      'instance_family_list': 'business_instance_family'}
        results = {}
        all_s = self.get_search_object()
        sla_category_list = all_s.extra(collapse={'field': 'business_sla_category'})
        results.update({'sla_category_list': [getattr(i, 'business_sla_category', None) for i in sla_category_list]})
        product_name_list = all_s.extra(collapse={'field': 'base_product_name'})
        results.update({'product_name_list': [getattr(i, 'base_product_name', None) for i in product_name_list]})
        s = self.filter_search_object(all_s)
        for key, param in params_map.items():
            s = s.extra(collapse={'field': param})
            results.update({key: [getattr(i, param) for i in s[0:100]]})

        return APIResponse(data=results)

# 使用 search對象的 .extra()  進行聚合查詢 具體請去看官方文檔

 




5.使用 go-mysql-elasticsearch 對 mysql 數據庫進行數據的同步

如果所有的數據都是通過orm 進行操作 則可以使用此方法, 不需要使用 go-mysql-elasticsearch

django-elasticsearch-dsl 可以使用信號量對mysql數據自動更新同步進es  只需要在 settings.py 文件中 添加以下配置

# 實時同步 用的是 django的信號量, 不需要額外的參數
ELASTICSEARCH_DSL_SIGNAL_PROCESSOR = 'django_elasticsearch_dsl.signals.RealTimeSignalProcessor'

mysql 開啟 binlog

server_id = 7578
log_bin = /apsarapangu/disk8/mysql/mysql-bin/bin-log
binlog_format = ROW

下載go-mysql-elasticsearch

編譯

cd /apsarapangu/disk8/go-mysql-elasticsearch
make
 
           

配置文件

配置文件在 項目目錄的 etc文件夾下, river.toml

# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = "virtweb@2017"
my_charset = "utf8"

# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "localhost:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""

# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
data_dir = "./var"

# Inner Http status address
stat_addr = "127.0.0.1:12800"
stat_path = "/metrics"

# pseudo server id like a slave
server_id = 1001

# mysql or mariadb
flavor = "mysql"

# mysqldump execution path
# if not set or empty, ignore mysqldump.
# mysqldump = "mysqldump"

# if we have no privilege to use mysqldump with --master-data,
# we must skip it.
#skip_master_data = false

# minimal items to be inserted in one bulk
bulk_size = 128

# force flush the pending requests if we don't have enough items >= bulk_size
flush_bulk_time = "200ms"

# Ignore table without primary key
skip_no_pk_table = false

# MySQL data source
[[source]]
schema = "virt_dashboard"

# Only below tables will be synced into Elasticsearch.
# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
# I don't think it is necessary to sync all tables in a database.
tables = ["nc_info"]

[[rule]]
schema = "virt_dashboard"
table = "nc_info"
index = "nc_info"
type = "_doc"
(virt-dashboard)
 
           

執行監控

./bin/go-mysql-elasticsearch --config=./etc/river.toml 
# 加 & 為后台運行 

注意  在go-mysql-elasticsearch 目錄下會 生成 var 文件夾.  里面有 監測mysql binlog 日志的記錄, 當 go-mysql-elasticsearch 意外停止后 啟動后會從停止的節點開始繼續同步, 不必擔心數據的丟失.

 

  

6.elasticsearch-head 的使用 

此工具需要npm

下載

安裝依賴

npm install

運行

npm run start

 

 

7.Kibana 安裝及使用 

Kibana 應該與 es 保持版本的一致

Kibana的配置

# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601

server.host: "0.0.0.0"
elasticsearch.hosts: ["http://localhost:9200"]
xpack.monitoring.ui.container.elasticsearch.enabled: false

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM