一.使用logstash同步訂單數據(訂單表和訂單項表)到ElasticSearch:
1.到官網下載logstash:https://www.elastic.co/cn/downloads/logstash
2.安裝logstash前,確保需要先安裝java的jdk環境
3.下載后,解壓:之后千萬別到bin環境點擊logstash.bat這個命令啟動,這樣會報錯的
4.接下來,在logstash安裝目錄找到config文件夾,在那里新增一個文件夾,我新建的為shop文件夾,然后在里面添加如下文件:
5.開始時.last_run_item.txt和last_run_order.txt文件是沒數據的
6.logstash_order.conf文件的配置如下:
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { jdbc { type => "order_mast" #下面同步ES可以根據type進行區分,單是單個表同步是,可以不寫這個 jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #這個是shop文件夾下的jar包 jdbc_paging_enabled => "true" jdbc_page_size => "2000" jdbc_driver_class => "com.mysql.jdbc.Driver" #jdbc跟賬號密碼需改成對應環境的 jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false" jdbc_user => "shop" jdbc_password => "shop" schedule => "* * * * *" #這個代表每分鍾同步一次 statement_filepath => "../config/shop/order_mast.sql" #這個是shop文件下的sql文件 record_last_run => true use_column_value => false last_run_metadata_path => "../config/shop/last_run_order.txt" #這個是記錄上一次更新的是什么時間,這樣就可以實現增量新增了 clean_run => false #是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } jdbc { type => "order_item" #下面同步ES可以根據type進行區分,單是單個表同步是,可以不寫這個 jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #這個是shop文件夾下的jar包 jdbc_paging_enabled => "true" jdbc_page_size => "2000" jdbc_driver_class => "com.mysql.jdbc.Driver" #這個代表每分鍾同步一次 #jdbc跟賬號密碼需改成對應環境的 jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false" jdbc_user => "shop" jdbc_password => "shop" schedule => "* * * * *" statement_filepath => "../config/shop/order_item.sql" #這個是shop文件下的sql文件 record_last_run => true use_column_value => false last_run_metadata_path => "../config/shop/last_run_item.txt" #這個是記錄上一次更新的是什么時間,這樣就可以實現增量新增了 clean_run => false #是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } } filter { #jdbc默認json,暫時沒找到修改方法 #json { # source => "message" # remove_field => ["message"] #} mutate { #需要移除的字段 remove_field => "@timestamp" remove_field => "@version" } } output { if [type]=="order_mast"{ elasticsearch { hosts => ["http://localhost:9200"] #如果有賬號密碼,在下面添加,並去除#號 #user => elastic #password => "elastic@test.com" index => "shop_order_mast" document_type => "order_mast" #這個在es7.0版本后就沒有type屬性了 document_id => "%{cod_order_id}" } } if [type]=="order_item"{ elasticsearch { hosts => ["http://localhost:9200"] #如果有賬號密碼,在下面添加,並去除#號 #user => elastic #password => "elastic@test.com" index => "shop_order_item" document_type => "order_item" document_id => "%{cod_order_item_id}" } } stdout { codec => json_lines } }
//如果只有一張表的時候,單表output的配置:
output { elasticsearch { hosts => ["http://localhost:9200"] #如果有賬號密碼,在下面添加,並去除#號 #user => elastic #password => "elastic@test.com" index => "shop_order_mast" document_type => "order_mast" #這個在es7.0版本后就沒有type屬性了 document_id => "%{cod_order_id}" } stdout { codec => json_lines } }
}
//sql的寫法,這里只提供orderItem
SELECT `cod_order_item_id` , -- 注意,這里寫了cod_order_item_id和下面同樣下了cod_order_item_id的意義不一樣,第一個是作為ES文檔的Id,會跟上面logstash_order.conf文件的 document_id => "%{cod_order_item_id}"匹配上 `cod_order_item_id` as "orderItemId", `cod_order_id`as "orderId", `flg_item_type`as "itemType", `cod_market_id`as "marketId", `cod_item_id`as "itemId", `cod_item_id_main`as "mainItemId", `txt_name`as "itemTitle", `cod_item_quantity`as "quantity", `amt_item`as "itemPrice", `cod_score_total`as "scoreTotal", `amt_score`as "scoreAmount", `amt_charge`as "chargeAmount", `amt_standard_price`as "standardPrice", `amt_balance_discount`as "balanceDiscountAmount", `amt_payment_total`as "itemTotalAmount", `amt_coupon_total`as "couponTotalAmount", `amt_act_discount`as "actDiscountAmount", `cod_order_parent_id`as "parentOrderId", `cod_merchant_no`as "shopId", `cod_create_user`as "createUserId", DATE_FORMAT( `dat_modify`, '%Y-%m-%d %T' ) AS "updateTime", DATE_FORMAT( `dat_create`, '%Y-%m-%d %T' ) AS "createTime", `cod_modify_user`as "updateUserId" from shop_order_item WHERE dat_modify >= :sql_last_value -- 這個sql_last_value會讀取shop文件夾下的last_run_item.txt的值,第一次同步時,沒有該值,所以默認就會是1970年7月1日,相當於是全量新增了
7.如果運行過一次后,打開last_run_item.txt可以看到
8.啟動logstash:需要保證你的ES已經啟動了,並創建了對應的index和type
window環境:在安裝目錄bin文件下,打開命令窗口,或者打開命令窗口,切換到該路徑: logstash -f ../config/shop/logstash_order.conf
如果是在linux環境,切換安裝的bin目錄執行:
nohup logstash -f ../config/shop/logstash_order.conf > ../logs/logstash.out &
9.之后打開ES查詢數據
可以看到數據已經同步過來了
10.之后可以在項目中進行對應的數據操作了,因為該同步是一分鍾同步一次,所以對於實時性要求特別高的,可以在代碼中使用ES的crud操作也進行同步,這樣就可以保證萬無一失了
11.ES相關操作可以參考:https://www.cnblogs.com/yangxiaohui227/p/11237268.html
12.附上一個orderItem表的(ES版本為6.4.3)操作
@Configuration public class ElasticsearchConfig implements InitializingBean{ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class); @Value("${elasticsearch.cluster.name}") private String clusterName; @Value("${elasticsearch.port}") private Integer port; @Value("${elasticsearch.host}") private String host; /** * Springboot整合Elasticsearch 在項目啟動前設置一下的屬性,防止報錯 * 解決netty沖突后初始化client時還會拋出異常 * java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4] */ @PostConstruct void init() { System.setProperty("es.set.netty.runtime.available.processors", "false"); } // @Before @Bean public TransportClient getTransportClient() { TransportClient client=null; LOGGER.info("elasticsearch init."); try { Settings settings = Settings.builder() .put("cluster.name", clusterName) //集群名字 .put("client.transport.sniff", true)//增加嗅探機制,找到ES集群 .put("thread_pool.search.size", 5).build();//增加線程池個數 client = new PreBuiltTransportClient(settings); TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(host), port); client.addTransportAddresses(transportAddress); LOGGER.info("elasticsearch init success."); return client; } catch (Exception e) { throw new RuntimeException("elasticsearch init fail."+ e); } }
}
//高級查詢對象 public class EsQueryObject { private String orderId; private String customerId; private String txtOrderTitle; private Integer orderStatus; private Integer paymentStatus; private String phone; private String recieveName; private String addresss; private String orderSubmitTime_S; private String orderSubmitTime_E; private String payTime_S; private String payTime_E; private BigDecimal minPayAmount; private BigDecimal maxPayAmount; private String shopId; private String itemId; private String itemTile; private Page page; public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getCustomerId() { return customerId; } public void setCustomerId(String customerId) { this.customerId = customerId; } public String getTxtOrderTitle() { return txtOrderTitle; } public void setTxtOrderTitle(String txtOrderTitle) { this.txtOrderTitle = txtOrderTitle; } public Integer getOrderStatus() { return orderStatus; } public void setOrderStatus(Integer orderStatus) { this.orderStatus = orderStatus; } public Integer getPaymentStatus() { return paymentStatus; } public void setPaymentStatus(Integer paymentStatus) { this.paymentStatus = paymentStatus; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public String getRecieveName() { return recieveName; } public void setRecieveName(String recieveName) { this.recieveName = recieveName; } public String getAddresss() { return addresss; } public void setAddresss(String addresss) { this.addresss = addresss; } public String getOrderSubmitTime_S() { return orderSubmitTime_S; } public void setOrderSubmitTime_S(String orderSubmitTime_S) { this.orderSubmitTime_S = orderSubmitTime_S; } public String getOrderSubmitTime_E() { return orderSubmitTime_E; } public void setOrderSubmitTime_E(String orderSubmitTime_E) { this.orderSubmitTime_E = orderSubmitTime_E; } public String getPayTime_S() { return payTime_S; } public void setPayTime_S(String payTime_S) { this.payTime_S = payTime_S; } public String getPayTime_E() { return payTime_E; } public void setPayTime_E(String payTime_E) { this.payTime_E = payTime_E; } public BigDecimal getMinPayAmount() { return minPayAmount; } public void setMinPayAmount(BigDecimal minPayAmount) { this.minPayAmount = minPayAmount; } public BigDecimal getMaxPayAmount() { return maxPayAmount; } public void setMaxPayAmount(BigDecimal maxPayAmount) { this.maxPayAmount = maxPayAmount; } public String getShopId() { return shopId; } public void setShopId(String shopId) { this.shopId = shopId; } public String getItemId() { return itemId; } public void setItemId(String itemId) { this.itemId = itemId; } public String getItemTile() { return itemTile; } public void setItemTile(String itemTile) { this.itemTile = itemTile; } public Page getPage() { return page; } public void setPage(Page page) { this.page = page; } }
package com.tft.shop.service.order; import com.alibaba.fastjson.JSON; import com.bootcrabframework.cloud.core.common.base.GenericBaseService; import com.bootcrabframework.cloud.core.util.CommonUtil; import com.bootcrabframework.cloud.core.util.DateUtil; import com.google.common.collect.Lists; import com.tft.shop.constant.order.OrderConstant; import com.tft.shop.entity.es.EsShopOrderItem; import com.tft.shop.entity.es.EsShopOrderItemRequestDTO; import com.tft.shop.entity.order.ShopOrderItem; import com.tft.shop.util.StringUtil; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service public class EsShopOrderItemService extends GenericBaseService { @Resource private TransportClient transportClient; //批量新增 public void batchInsert(List<EsShopOrderItem> list){ if(CommonUtil.isNull(list)){ return; } BulkRequest bulkRequest = new BulkRequest(); list.forEach(a->{ IndexRequest indexRequest = new IndexRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, a.getOrderItemId()); indexRequest.source(JSON.toJSONString(a), XContentType.JSON); bulkRequest.add(indexRequest); }); ActionFuture<BulkResponse> bulk = transportClient.bulk(bulkRequest); boolean failures = bulk.actionGet().hasFailures(); if(!failures){ return; //沒有失敗 } //如果有失敗,輸出哪一條是失敗的 try { BulkResponse bulkItemResponses = bulk.get(); if(bulkItemResponses==null){ return; } if(CommonUtil.isNull(bulkItemResponses.getItems())){ return; } for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) { boolean failed = bulkItemResponse.isFailed(); if(failed){ logger.error("訂單項插入ES失敗,錯誤信息{},對應訂單項編號{}",bulkItemResponse.getId(),bulkItemResponse.getFailureMessage()); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //單條新增 public void insertOne(EsShopOrderItem item){ if(null==item){ return; } List<EsShopOrderItem> list =Lists.newArrayList(); list.add(item); this.batchInsert(list); } //單條新增 public void insertOne(ShopOrderItem orderItem){ this.insertOne(shopOrderItemChangeToEsOrderItem(orderItem)); } private EsShopOrderItem shopOrderItemChangeToEsOrderItem(ShopOrderItem orderItem){ if(null==orderItem){ return null; } EsShopOrderItem shopOrderItem = new EsShopOrderItem(); shopOrderItem.setOrderItemId(orderItem.getCodOrderItemId()); shopOrderItem.setOrderId(orderItem.getCodOrderId()); shopOrderItem.setItemType(orderItem.getFlgItemType()); shopOrderItem.setMarketId(orderItem.getCodMarketId()); shopOrderItem.setItemId(orderItem.getCodItemId()); shopOrderItem.setMainItemId(orderItem.getCodItemIdMain()); shopOrderItem.setItemTitle(orderItem.getTxtName()); shopOrderItem.setQuantity(orderItem.getCodItemQuantity()); shopOrderItem.setItemPrice(orderItem.getAmtItem()); shopOrderItem.setScoreTotal(orderItem.getCodScoreTotal()); shopOrderItem.setScoreAmount(orderItem.getAmtScore()); shopOrderItem.setChargeAmount(orderItem.getAmtCharge()); shopOrderItem.setStandardPrice(orderItem.getAmtStandardPrice()); shopOrderItem.setBalanceDiscountAmount(orderItem.getAmtBalanceDiscount()); shopOrderItem.setItemTotalAmount(orderItem.getAmtPaymentTotal()); shopOrderItem.setActDiscountAmount(orderItem.getAmtActDiscount()); shopOrderItem.setCouponTotalAmount(orderItem.getAmtCouponTotal()); shopOrderItem.setParentOrderId(orderItem.getCodOrderParentId()); shopOrderItem.setShopId(orderItem.getCodMerchantNo()); shopOrderItem.setCreateUserId(orderItem.getCodCreateUser()); if(null!=orderItem.getDatCreate()){ shopOrderItem.setCreateTime(DateUtil.dateFormat(orderItem.getDatCreate(),DateUtil.TIME_FORMAT_FULL)); } if(null!=orderItem.getDatModify()){ shopOrderItem.setUpdateTime(DateUtil.dateFormat(orderItem.getDatModify(),DateUtil.TIME_FORMAT_FULL)); } shopOrderItem.setUpdateUserId(orderItem.getCodModifyUser()); return shopOrderItem; } //刪除 public void deleteOne(String orderItemId){ if(CommonUtil.isNull(orderItemId)){ return; } ActionFuture<DeleteResponse> actionFuture = transportClient.delete(new DeleteRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId)); if(actionFuture==null){ return; } DeleteResponse deleteResponse = actionFuture.actionGet(); if(null==deleteResponse || null==deleteResponse.status()){ return; } if(deleteResponse.status().getStatus()!=200){ logger.error("刪除ES訂單項,編號為{},刪除失敗",orderItemId); } } //修改 public void updateOne(EsShopOrderItem esShopOrderItem){ if(null==esShopOrderItem){ return; } UpdateResponse updateResponse = transportClient.prepareUpdate(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, esShopOrderItem.getOrderItemId()) .setDoc(JSON.toJSONString(esShopOrderItem), XContentType.JSON).execute().actionGet(); if(null==updateResponse || null==updateResponse.status()){ return; } if(updateResponse.status().getStatus()!=200){ logger.error("修改ES訂單項失敗,編號為{}",esShopOrderItem.getOrderItemId()); } } //修改 public void updateOne(ShopOrderItem orderItem){ this.updateOne(this.shopOrderItemChangeToEsOrderItem(orderItem)); } //查詢單個 public EsShopOrderItem selectById(String orderItemId){ if(StringUtil.isEmpty(orderItemId)){ return null; } GetRequestBuilder ret = transportClient.prepareGet(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId); if(null==ret || null==ret.get()){ return null; } GetResponse response = ret.get(); if(StringUtil.isEmpty(response.getSourceAsString())){ return null; } return JSON.parseObject(response.getSourceAsString(),EsShopOrderItem.class); } /** * * * @param req 高級查詢對象,當用商品標題查詢的時候,限制只返回最大2000條 * @return */ public List<EsShopOrderItem> queryAdvanced(EsShopOrderItemRequestDTO req){ if(null==req){ return null; } SearchRequest searchRequest = new SearchRequest(OrderConstant.ES_ORDER_ITEM_INDEX); searchRequest.types(OrderConstant.ES_ORDER_ITEM_TYPE); // 構造查詢器 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); if(!StringUtils.isEmpty(req.getItemTitle())){ boolQueryBuilder.must(QueryBuilders.matchQuery("itemTitle",req.getItemTitle())); } if(!StringUtils.isEmpty(req.getItemId())){ boolQueryBuilder.must(QueryBuilders.termQuery("itemId",req.getItemId())); } if(!StringUtils.isEmpty(req.getShopId())){ boolQueryBuilder.must(QueryBuilders.termQuery("shopId",req.getShopId())); } if(!StringUtils.isEmpty(req.getCustomerId())){ boolQueryBuilder.must(QueryBuilders.termQuery("createUserId",req.getCustomerId())); } if(!StringUtils.isEmpty(req.getParentOrderId())){ boolQueryBuilder.must(QueryBuilders.termQuery("parentOrderId",req.getParentOrderId())); } if(!StringUtils.isEmpty(req.getOrderId())){ boolQueryBuilder.must(QueryBuilders.termQuery("orderId",req.getOrderId())); } if(null!=req.getItemType() && req.getItemType()>=0){ boolQueryBuilder.must(QueryBuilders.termQuery("itemType",req.getItemType())); } if(!StringUtils.isEmpty(req.getCreateStartTime())){ boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte(req.getCreateStartTime())); } if(!StringUtils.isEmpty(req.getCreateEndTime())){ boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").lte(req.getCreateEndTime())); } sourceBuilder.query(boolQueryBuilder); sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); if(!StringUtils.isEmpty(req.getItemTitle())){
//注意分頁from()的參數並不是頁碼,而是偏移量,如頁數為num時,偏移量=(num-1)* pageSize sourceBuilder.from(0).size(2000); } sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.DESC)); searchRequest.source(sourceBuilder); searchRequest.searchType(SearchType.QUERY_THEN_FETCH); SearchResponse searchResponse = transportClient.search(searchRequest).actionGet(); if(null==searchResponse || null==searchResponse.getHits() || searchResponse.getHits().totalHits<=0){ return null; } List<EsShopOrderItem> list = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); EsShopOrderItem orderItem = JSON.parseObject(sourceAsString, EsShopOrderItem.class); list.add(orderItem); } return list; } public List<String> queryOrderIdList(EsShopOrderItemRequestDTO req){ if(null==req){ return null; } List<EsShopOrderItem> shopOrderItems = this.queryAdvanced(req); if(CommonUtil.isNull(shopOrderItems)){ return null; } return shopOrderItems.stream().map(a->a.getOrderId()).collect(Collectors.toList()); } }
//附上shop_order_item的mapping配置:
put shop_order_item
{
"settings": {
"analysis": {
"analyzer": {
"thai_analyzer": {
"type": "custom",
"tokenizer": "thai",
"filter": [
"lowercase",
"asciifolding"
]
},
"caseSensitive": {
"filter": "lowercase",
"type": "custom",
"tokenizer": "keyword"
}
}
}
},
"mappings": {
"order_item": {
"properties": {
"orderId": {
"type": "keyword"
},
"parentOrderId": {
"type": "keyword"
},
"shopId": {
"type": "keyword"
},
"orderItemId": {
"type": "keyword"
},
"itemTitle": {
"type": "text",
"analyzer": "thai_analyzer",
"search_analyzer": "thai_analyzer"
},
"itemId": {
"type": "keyword"
},
"mainItemId": {
"type": "keyword"
},
"marketId": {
"type": "keyword"
},
"itemType": {
"type": "integer"
},
"quantity": {
"type": "integer"
},
"scoreTotal": {
"type": "integer"
},
"scoreAmount": {
"type": "double"
},
"chargeAmount": {
"type": "double"
},
"itemPrice": {
"type": "double"
},
"standardPrice": {
"type": "double"
},
"itemTotalAmount": {
"type": "double"
},
"couponTotalAmount": {
"type": "double"
},
"balanceDiscountAmount": {
"type": "double"
},
"actDiscountAmount": {
"type": "double"
},
"createTime": {
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
"type": "date"
},
"updateTime": {
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
"type": "date"
},
"createUserId": {
"type": "keyword"
},
"updateUserId": {
"type": "keyword"
}
}
}
}
}