在使用Eleasticsearch進行索引維護的過程中,如果你的應用場景需要頻繁的大批量的索引寫入,再使用上篇中提到的維護方法的話顯然效率是低下的,此時推薦使用bulkIndex來提升效率。批寫入數據塊的大小取決於你的數據集及集群的配置。
下面我們以Spring Boot結合Elasticsearch創建一個示例項目,從基本的pom配置開始
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
application.properties配置
#elasticsearch config
spring.data.elasticsearch.cluster-name:elasticsearch
spring.data.elasticsearch.cluster-nodes:192.168.1.105:9300
#application config
server.port=8080
spring.application.name=esp-app
我們需要定義域的實體和一個Spring data的基本的CRUD支持庫類。用id注釋定義標識符字段,如果你沒有指定ID字段,Elasticsearch不能索引你的文件。同時需要指定索引名稱類型,@Document注解也有助於我們設置分片和副本數量。
@Data
@Document(indexName = "carIndex", type = "carType", shards = 1, replicas = 0)
public class Car implements Serializable {
/**
* serialVersionUID:
* @since JDK 1.6
*/
private static final long serialVersionUID = 1L;
@Id
private Long id;
private String brand;
private String model;
private BigDecimal amount;
public Car(Long id, String brand, String model, BigDecimal amount) {
this.id = id;
this.brand = brand;
this.model = model;
this.amount = amount;
}
}
接着定義一個IndexService並使用bulk請求來處理索引,操作前首先要判斷索引是否存在,以免出現異常。為了更好的掌握Java API,這里采用了不同於上篇中ElasticSearchRepository的ElasticSearchTemplate工具集,相對來講功能更加豐富。
@Service
public class IndexerService {
private static final String CAR_INDEX_NAME = "car_index";
private static final String CAR_INDEX_TYPE = "car_type";
@Autowired
ElasticsearchTemplate elasticsearchTemplate;
public long bulkIndex() throws Exception {
int counter = 0;
try {
//判斷索引是否存在
if (!elasticsearchTemplate.indexExists(CAR_INDEX_NAME)) {
elasticsearchTemplate.createIndex(CAR_INDEX_NAME);
}
Gson gson = new Gson();
List<IndexQuery> queries = new ArrayList<IndexQuery>();
List<Car> cars = assembleTestData();
for (Car car : cars) {
IndexQuery indexQuery = new IndexQuery();
indexQuery.setId(car.getId().toString());
indexQuery.setSource(gson.toJson(car));
indexQuery.setIndexName(CAR_INDEX_NAME);
indexQuery.setType(CAR_INDEX_TYPE);
queries.add(indexQuery);
//分批提交索引
if (counter % 500 == 0) {
elasticsearchTemplate.bulkIndex(queries);
queries.clear();
System.out.println("bulkIndex counter : " + counter);
}
counter++;
}
//不足批的索引最后不要忘記提交
if (queries.size() > 0) {
elasticsearchTemplate.bulkIndex(queries);
}
elasticsearchTemplate.refresh(CAR_INDEX_NAME);
System.out.println("bulkIndex completed.");
} catch (Exception e) {
System.out.println("IndexerService.bulkIndex e;" + e.getMessage());
throw e;
}
return -1;
}
private List<Car> assembleTestData() {
List<Car> cars = new ArrayList<Car>();
//隨機生成10000個索引,以便下一次批量寫入
for (int i = 0; i < 10000; i++) {
cars.add(new Car(RandomUtils.nextLong(1, 11111), RandomStringUtils.randomAscii(20), RandomStringUtils.randomAlphabetic(15), BigDecimal.valueOf(78000)));
}
return cars;
}
}
再下面的工作就比較簡單了,可以編寫一個RestController接受請求來測試或者CommandLineRunner,在系統啟動時就加載上面的方法。
@SpringBootApplication
@RestController
public class ESPApplicatoin {
public static void main(String[] args) {
SpringApplication.run(ESPApplicatoin.class, args);
}
@Autowired
IndexerService indexService;
@RequestMapping(value = "bulkIndex",method = RequestMethod.POST)
public void bulkIndex(){
try {
indexService.bulkIndex();
} catch (Exception e) {
e.printStackTrace();
}
}
}
CommandLineRunner方法類:
@Component
public class AppLoader implements CommandLineRunner {
@Autowired
IndexerService indexerService;
@Override
public void run(String... strings) throws Exception {
indexerService.bulkIndex();
}
}
結束后,就可在通過地址http://localhost:9200/car_index/_search/來查看索引到底有無生效。注:要特別關注版本的兼容問題,如果用Es 5+的話,顯然不能采用Spring Data Elasticsearch的方式。
Spring Boot Version (x) |
Spring Data Elasticsearch Version (y) | Elasticsearch Version (z) |
---|---|---|
x <= 1.3.5 | y <= 1.3.4 | z <= 1.7.2* |
x >= 1.4.x | 2.0.0 <=y < 5.0.0** | 2.0.0 <= z < 5.0.0** |
(*) - require manual change in your project pom file (solution 2.)
(**) - Next big ES release with breaking changes
>>>案例地址:https://github.com/backkoms/spring-boot-elasticsearch
擴展閱讀:
Spring Boot + Elasticsearch 實現索引的日常維護
基於SpringCloud的Microservices架構實戰案例-序篇
Nginx+Lua+MySQL/Redis實現高性能動態網頁展現
