Elasticsearch 集成 django(傳統數據庫)
介紹: 增加mysql一張數據庫表的查詢速率, 特實現es作為傳統數據庫使用的方案
1.django-elasticsearch 的使用
一 安裝
pip install django-elasticsearch-dsl # 會同時安裝 elasticsearch-dsl 和 elasticsearch
二 注冊 + 配置
INSTALLED_APPS = [ ... 'django_elasticsearch_dsl', ... ] # Es 配置 ELASTICSEARCH_DSL={ 'default': { 'hosts': 'localhost:9200' }, }
三 創建documents.py文件,做 orm 與 es 索引的映射
# 注意, 在app 目錄下創建 documents.py 文件 名字一定要對, 不然執行命令時 會找不到索引 from django_elasticsearch_dsl import Document, fields from django_elasticsearch_dsl.registries import registry from public_app.models import NcInfo @registry.register_document class NcInfoDocument(Document): # 注意命名規范 # 自定義索引字段類型 因為要作為mysql數據庫的延伸 , 所以 需要自定義字段為keyword 類型. 否則會被es自動分詞 pk = fields.IntegerField() base_product_name = fields.KeywordField() base_standard_model = fields.KeywordField() business_sla_category = fields.KeywordField() ds = fields.KeywordField() ip = fields.KeywordField() ops_azone_id = fields.KeywordField() ops_cluster_id = fields.KeywordField() ops_region_name = fields.KeywordField() business_nc_id = fields.KeywordField() business_instance_family = fields.KeywordField() class Index: name = 'nc_info' settings = { # 設置最大索引深度(**重要) 分頁查詢時要用到 'max_result_window': 10000000, # 切片個數 'number_of_shards':8, # 保存副本數 'number_of_replicas':2 } class Django: model = NcInfo # 與此文檔關聯的模型 # 要在Elasticsearch中建立索引的模型的字段 # fields 置空 則會根據上方的對象的屬性進行映射, 可直接寫orm模型類字段名, 會根據orm中的字段類型進行自動選擇文檔字段類型 fields = [] # 執行遷移時的 每次從mysql中數據讀取的條數. queryset_pagination = 50000
四 執行遷移
python manage.py search_index --rebuild
五 查看索引信息
2.ElasticSearch 的搭建
一 安裝JDK環境
因為ElasticSearch是用Java語言編寫的,所以必須安裝JDK的環境,並且是JDK 1.8以上,具體操作步驟自行百度
安裝完成查看java版本
java -version
二 官網下載最新版本
下載地址[https://www.elastic.co/cn/downloads/elasticsearch],選擇相應版本下載即可
三 下載其他版本
直接點擊https://www.elastic.co/cn/downloads/past-releases#elasticsearch
三 下載完成,啟動
解壓文件,切換到解壓文件路徑下,執行
cd elasticsearch-<version> #切換到路徑下 ./bin/elasticsearch #啟動es #如果你想把 Elasticsearch 作為一個守護進程在后台運行,那么可以在后面添加參數 -d 。 #如果你是在 Windows 上面運行 Elasticseach,你應該運行 bin\elasticsearch.bat 而不是 bin\elasticsearch
四 測試啟動是否成功
在瀏覽器輸入以下地址:http://127.0.0.1:9200/
即可看到如下內容:
{ "name" : "lqzMacBook.local", "cluster_name" : "elasticsearch", "cluster_uuid" : "G1DFg-u6QdGFvz8Z-XMZqQ", "version" : { "number" : "7.5.0", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "e9ccaed468e2fac2275a3761849cbee64b39519f", "build_date" : "2019-11-26T01:06:52.518245Z", "build_snapshot" : false, "lucene_version" : "8.3.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
五 關閉es
#查看進程 ps -ef | grep elastic #干掉進程 kill -9 2382(進程號) #以守護進程方式啟動es elasticsearch -d
六 Es 集群配置
node1
# 集群名稱 cluster.name: online-virt-elasticsearch # 節點名稱 node.name: es-node-1 # 是否可以成為master節點 node.master: true # 是否允許該節點存儲數據,默認開啟 node.data: true # 網絡綁定,寫自己的主機 network.host: 0.0.0.0 # 設置對外服務的http端口,默認為9200,為和單機分開,我設置9201 http.port: 9200 # 設置節點間交互的tcp端口,默認是9300,為和單機分開,我設置9301 transport.tcp.port: 9401 # 手動指定可以成為 mater 的所有節點的 name 或者 ip,這些配置將會在第一次選舉中進行計算 cluster.initial_master_nodes: ["es-node-1","es-node-2","es-node-3","es-node-4","es-node-5"] ##設置集群自動發現機器ip的集合 discovery.seed_hosts: ["localhost:9401","localhost:9402","localhost:9403", "localhost:9404","localhost:9405"] bootstrap.memory_lock: false bootstrap.system_call_filter: false #允許跨域訪問 http.cors.enabled: true http.cors.allow-origin: "*"
3.封裝 elasticsearch 集成 drf (只查詢接口使用)
elasticsearch 自定義包封裝
-
__init__
.py
from .filterset import EsFilterSet from .elasticsearch_generic import ElasticSearch from .pagination import EsStandardPagination
- elasticsearch_generic.py
# -*- coding: utf-8 -*- from elasticsearch_dsl.document import IndexMeta from . import filter_backend class ElasticSearch(object): search_object = None es_filter_class = None es_paginate_class = None es_paginator = None def get_search_object(self): assert self.search_object is not None, ( "'%s' should either include a `search_object` attribute, " "or override the `get_search_object()` method." % self.__class__.__name__ ) search_object = self.search_object if isinstance(search_object, IndexMeta): search_object = search_object.search() return search_object def filter_search_object(self, search_object): if self.es_filter_class: return filter_backend.EsFilterBackend().filter_search_object(self.request, search_object, self) return search_object def paginate_search(self, search_obj, sort=None): if self.es_paginate_class: self.es_paginator = self.es_paginate_class(search_obj, self.request, sort) return self.es_paginator.paginate def get_es_paginated_response(self, data): """ Return a paginated style `Response` object for the given output data. """ return self.es_paginator.get_paginated_response(data)
- filter_backend.py
# -*- coding: utf-8 -*- class EsFilterBackend(object): @staticmethod def get_es_filter_class(view, search_object=None): es_filter_class = getattr(view, 'es_filter_class', None) if es_filter_class: filter_model = es_filter_class.Meta.model assert issubclass(search_object._model, filter_model), \ 'EsFilterSet model %s does not match search_object model %s' % \ (filter_model, search_object.model) return es_filter_class def filter_search_object(self, request, search_object, view): es_filter_class = self.get_es_filter_class(view, search_object) if es_filter_class: return es_filter_class(request.query_params, search_object=search_object, request=request).qs return search_object
- filters.py
# -*- coding: utf-8 -*- from django_elasticsearch_dsl import fields from elasticsearch_dsl import Q class Filter(object): field_class = fields.Field def __init__(self, field_name=None, lookup_expr='exact', **kwargs): self.field_name = field_name if field_name is None and 'name' in kwargs: self.field_name = kwargs.pop('name') self.lookup_expr = lookup_expr def filter(self, name, params): if not params.get(name): return if self.lookup_expr == 'term': param = params.get(name) key = 'term' if '&' in param: values = param.split('&') key = 'terms' elif ',' in param: values = param.split(',') key = 'terms' else: values = param query = Q(key, **{self.field_name: values}) elif self.lookup_expr == 'gte' or self.lookup_expr == "lte": m = {self.lookup_expr: params.get(name)} q = {self.field_name: m} # query = {'range': q} query = Q('range', **q) else: raise Exception("{} can not set lookup_expr='{}'".format(self.field_name, self.lookup_expr)) return query class KeywordField(Filter): field_class = fields.KeywordField class IntegerField(Filter): field_class = fields.IntegerField class DateField(Filter): field_class = fields.DateField
- filterset.py
# -*- coding: utf-8 -*- import copy from collections import OrderedDict from elasticsearch_dsl import Q class FilterSetOptions(object): def __init__(self, options=None): self.model = getattr(options, 'model', None) self.fields = getattr(options, 'fields', None) self.exclude = getattr(options, 'exclude', None) class FilterSetMetaclass(type): def __new__(cls, name, bases, attrs): new_class = super(FilterSetMetaclass, cls).__new__(cls, name, bases, attrs) new_class._meta = FilterSetOptions(getattr(new_class, 'Meta', None)) new_class.base_filters = new_class.get_filters() return new_class class BaseFilterSet(object): def __init__(self, data=None, search_object=None, request=None): self.params = data self.search_object = search_object self.is_bound = data is not None self.request = request self.filters = copy.deepcopy(self.base_filters) @property def qs(self): if not hasattr(self, '_eqs'): if not self.is_bound: self._eqs = self.search_object return self._eqs _eqs = self.search_object must_query = [] for name, filter_ in self.filters.items(): query_ = filter_.filter(name, self.params) if query_: must_query.append(query_) q = Q('bool', must=must_query) self._eqs = _eqs.query(q) return self._eqs @classmethod def get_filters(cls): filters = OrderedDict() if not cls._meta.model: return fields = cls._meta.fields for key in fields: assert getattr(cls, key, None), \ 'EsFilterSet filter field %s does not match %s fields ' % \ (key, cls.__name__) filters.update({key: getattr(cls, key)}) return filters class EsFilterSet(BaseFilterSet): __metaclass__ = FilterSetMetaclass pass
- pagination.py
# -*- coding: utf-8 -*- from utils.response import APIResponse class EsStandardPagination(object): page_size = 20 page_size_query_param = 'per_page' page_query_param = "page" # url參數 max_page_size = 100 def __init__(self, search_object, request, sort=None): self.search_object = search_object self.params = request.query_params self.page = 1 self.count = search_object.count() self.total_page = 0 self.sort = {'pk': {'order': 'asc'}} if not sort else sort @property def paginate(self): self.page_size = int(self.params.get(self.page_size_query_param) if self.params.get( self.page_size_query_param) else self.page_size) self.page = int(self.params.get(self.page_query_param) if self.params.get(self.page_query_param) else 1) start = (self.page - 1) * self.page_size end = start + self.page_size s = self.search_object.sort(self.sort) self.total_page = (self.count // self.page_size) + 1 queryset = s[start:end].to_queryset() return queryset def get_paginated_response(self, data): return APIResponse(data={ "count": self.count, "page": self.page, "per_page": self.page_size, "total_page": self.total_page, # 'next': self.get_next_link(), # 'previous': self.get_previous_link(), "results": data })
4.封裝包的使用, 快速寫接口
視圖
# 導入自定義的 ElasticSearch 和 EsStandardPagination from utils.elasticsearch import ElasticSearch, EsStandardPagination # 繼承 GenericAPIView 和 ElasticSearch class NcInfoShowEsView(GenericAPIView, ElasticSearch): # 獲取 search_object, 此為 documents 定義的文檔類. search_object = documents.NcInfoDocument.search() # 搜索類 es_filter_class = es_filter.NcInfoFilter # 分頁類 es_paginate_class = EsStandardPagination def get(self, request): s = self.filter_search_object(self.get_search_object()) # 分頁組件會把分頁后的es對象 轉換成 queryset 對象. paginate_search 有兩個參數, 一是查詢后的 search對象, 第二個是排序參數, 默認是 {'pk': {'order': 'asc'}} pk為從orm 獲取的 id,或 pk . page = self.paginate_search(s) if page is not None: # 視圖中 分頁組件會把分頁后的es對象 轉換成 queryset 對象. 可以使用序列化類進行序列化. 完成 serializer = serializers.NcInfoSerializer(page, many=True) return self.get_es_paginated_response(serializer.data) serializer = serializers.NcInfoSerializer(page, many=True) return APIResponse(data=serializer.data)
過濾組件
from . import models from utils.elasticsearch import EsFilterSet from utils.elasticsearch import filters # 繼承自定義esFilterSet class NcInfoFilter(EsFilterSet): # 前端攜帶參數來的key 對應的文檔字段類型 # field_name: 對應的es索引 文檔中的字段 # lookup_expr: 過濾方式, 目前有 term方式 為精准匹配. lte, gte 大於小於. base_product_name = filters.KeywordField(field_name='base_product_name', lookup_expr='term') business_sla_category = filters.KeywordField(field_name='business_sla_category', lookup_expr='term') ds = filters.KeywordField(field_name='ds', lookup_expr='term') class Meta: # 過濾所對應的orm表 model = models.NcInfo # 參與過濾的字段, 需要在此注冊, 否則不參與過濾. fields = ['base_product_name', 'business_sla_category', 'ds', 'business_instance_family', 'ops_cluster_id', 'ops_region_name', 'ops_azone_id','base_standard_model']
聚合查詢
# 聚合查詢 elasticsearch 沒有提供很好的接口. 目前使用 collapse 進行聚合查詢 class NcInfoOptionsView(GenericAPIView, ElasticSearch): """ nc_info數據展示列表頁篩選數據 """ search_object = documents.NcInfoDocument.search() es_filter_class = es_filter.NcInfoFilter def get(self, request, *args, **kwargs): params_map = {'standard_model_list': 'base_standard_model', 'region_name_list': 'ops_region_name', 'azone_id_list': 'ops_azone_id', 'cluster_id_list': 'ops_cluster_id', 'instance_family_list': 'business_instance_family'} results = {} all_s = self.get_search_object() sla_category_list = all_s.extra(collapse={'field': 'business_sla_category'}) results.update({'sla_category_list': [getattr(i, 'business_sla_category', None) for i in sla_category_list]}) product_name_list = all_s.extra(collapse={'field': 'base_product_name'}) results.update({'product_name_list': [getattr(i, 'base_product_name', None) for i in product_name_list]}) s = self.filter_search_object(all_s) for key, param in params_map.items(): s = s.extra(collapse={'field': param}) results.update({key: [getattr(i, param) for i in s[0:100]]}) return APIResponse(data=results) # 使用 search對象的 .extra() 進行聚合查詢 具體請去看官方文檔
5.使用 go-mysql-elasticsearch 對 mysql 數據庫進行數據的同步
如果所有的數據都是通過orm 進行操作 則可以使用此方法, 不需要使用 go-mysql-elasticsearch
django-elasticsearch-dsl 可以使用信號量對mysql數據自動更新同步進es 只需要在 settings.py 文件中 添加以下配置
# 實時同步 用的是 django的信號量, 不需要額外的參數 ELASTICSEARCH_DSL_SIGNAL_PROCESSOR = 'django_elasticsearch_dsl.signals.RealTimeSignalProcessor'
mysql 開啟 binlog
server_id = 7578 log_bin = /apsarapangu/disk8/mysql/mysql-bin/bin-log binlog_format = ROW
下載go-mysql-elasticsearch
編譯
cd /apsarapangu/disk8/go-mysql-elasticsearch
make
配置文件
配置文件在 項目目錄的 etc文件夾下, river.toml
# MySQL address, user and password # user must have replication privilege in MySQL. my_addr = "127.0.0.1:3306" my_user = "root" my_pass = "virtweb@2017" my_charset = "utf8" # Set true when elasticsearch use https #es_https = false # Elasticsearch address es_addr = "localhost:9200" # Elasticsearch user and password, maybe set by shield, nginx, or x-pack es_user = "" es_pass = "" # Path to store data, like master.info, if not set or empty, # we must use this to support breakpoint resume syncing. # TODO: support other storage, like etcd. data_dir = "./var" # Inner Http status address stat_addr = "127.0.0.1:12800" stat_path = "/metrics" # pseudo server id like a slave server_id = 1001 # mysql or mariadb flavor = "mysql" # mysqldump execution path # if not set or empty, ignore mysqldump. # mysqldump = "mysqldump" # if we have no privilege to use mysqldump with --master-data, # we must skip it. #skip_master_data = false # minimal items to be inserted in one bulk bulk_size = 128 # force flush the pending requests if we don't have enough items >= bulk_size flush_bulk_time = "200ms" # Ignore table without primary key skip_no_pk_table = false # MySQL data source [[source]] schema = "virt_dashboard" # Only below tables will be synced into Elasticsearch. # "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 # I don't think it is necessary to sync all tables in a database. tables = ["nc_info"] [[rule]] schema = "virt_dashboard" table = "nc_info" index = "nc_info" type = "_doc" (virt-dashboard)
執行監控
./bin/go-mysql-elasticsearch --config=./etc/river.toml # 加 & 為后台運行
注意 在go-mysql-elasticsearch 目錄下會 生成 var 文件夾. 里面有 監測mysql binlog 日志的記錄, 當 go-mysql-elasticsearch 意外停止后 啟動后會從停止的節點開始繼續同步, 不必擔心數據的丟失.
6.elasticsearch-head 的使用
此工具需要npm
下載
安裝依賴
npm install
運行
npm run start
7.Kibana 安裝及使用
Kibana 應該與 es 保持版本的一致
Kibana的配置
# Kibana is served by a back end server. This setting specifies the port to use. server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://localhost:9200"] xpack.monitoring.ui.container.elasticsearch.enabled: false