ELK相關TODO
Kafka相關TODO
前言
快速開始文檔中,講解了ELK三個組件的下載、安裝、配置、啟動等過程。只要按照文章走一下,就可以看到一個單機版的ELK三件套。本文會帶你整合SpringBoot、ELK、Kafka,組成最常見的日志系統。當然,這套組合不僅能作為日志系統,也能作為大數據流處理的前半部分(數據的收集)。后面也會帶來大數據相關的隨筆文章。本文也會附帶相關源碼,鏈接如下:
依賴導入
雖然整合的是SpringBoot,但是為了方便前期學習、理解,我們就不用SpringData Elasticsearch的starter了。在熟悉了ES官方提供的Java客戶端后,可以再使用SpringData Elasticsearch,其提供了很多非常方便的注解。除了注解,還有starter提供的自動配置等功能。更多相關功能和用法可以自行去查看Spring Data官方文檔或相關博客。
這里我們只導入ES提供的Java客戶端,然后手動去初始化ES。注意導入的版本,最好和服務器的ES版本保持一致。但是由於前段時間log4j的漏洞,導致7.14之前的版本全都被遺棄了,所以這里最低只能導入7.14版本。不過只要版本差距不大,一般不會有問題。
<!-- Elasticsearch服務 -->
<!-- 生成環境中,依賴版本最好和服務器的ES的版本保持一致,因為log4j的漏洞,7.14.0之前的部分依賴被遺棄無法成功下載-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.14.0</version>
</dependency>
<!-- Elasticsearch Java高級客戶端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14.0</version>
</dependency>
初始化ES客戶端
首先去yml或properties添加配置信息。這個配置key不是上面的jar包提供的,而是我們自己自定義的,然后通過@Value注解獲取值。所以你的key不一定要和我一樣。配置如下:
elasticsearch:
host: ip
port: port
然后初始化RestHighLevelClient即可:
@Configuration
@Slf4j
public class ElasticSearchConfig{
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private int port;
@Bean
public RestHighLevelClient restHighLevelClient() {
RestHighLevelClient restHighLevelClient = null;
try {
log.info("elasticsearch start init...");
restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));
log.info("elasticsearch init success!");
}catch (Exception e){
log.error("elasticsearch init had exception:{}", ExceptionUtils.getStackTrace(e));
}
return restHighLevelClient;
}
}
造數據
自己手動編兩個數據總覺得不帶勁,一是數據量太少,二是太麻煩。我這里推薦一個開源的,自動生成數據的工具,依賴如下,記得排除snakeyaml,可能會和你的springboot中的yaml產生沖突。
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
<exclusions>
<exclusion>
<artifactId>snakeyaml</artifactId>
<groupId>org.yaml</groupId>
</exclusion>
</exclusions>
</dependency>
造數據的工具類和實體類很簡單,直接貼一下代碼:
@Data
@Accessors(chain = true)
public class User {
private Long id;
private String traceId;
private String name;
private String birthday;
private String job;
private String address;
private String company;
}
public class GenerateUserUtil {
private static final Faker faker = new Faker();
public static User generate(){
return new User()
.setId(System.currentTimeMillis())
.setTraceId(UUID.randomUUID().toString())
.setName(faker.name().name())
.setBirthday(DateFormat.format(faker.date().birthday(18,60)))
.setJob(faker.job().title())
.setAddress(faker.address().fullAddress())
.setCompany(faker.company().name());
}
}
往ES寫數據
往ES寫數據之前,需要新建索引、定義mapping。根據你的實體類然后定義mapping即可。下面一共有三個類,ESConstant中定義了索引常量字符串和mapping。ESUtil封裝了RestHighLevelClient,向外提供了創建索引和添加文檔兩個方法。WriteLogService模擬業務的服務類,不停產生數據和寫日志。
public class ESConstant {
public static final String ES_USER_INDEX_PREFIX = "user";
public static final String MAPPING ="{\n" +
" \"properties\": {\n" +
" \"id\":{\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"traceId\":{\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"name\":{\n" +
" \"type\": \"text\"\n" +
" , \"analyzer\": \"standard\"\n" +
" },\n" +
" \"birthday\":{\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"job\":{\n" +
" \"type\": \"text\"\n" +
" , \"analyzer\": \"standard\"\n" +
" },\n" +
" \"address\":{\n" +
" \"type\": \"text\"\n" +
" , \"analyzer\": \"standard\"\n" +
" },\n" +
" \"company\":{\n" +
" \"type\": \"text\"\n" +
" , \"analyzer\": \"standard\"\n" +
" }\n" +
" }\n" +
" }";
}
@Component
public class ESUtil {
@Autowired
RestHighLevelClient restHighLevelClient;
public void createIndex(String indexName,String mapping,int shards,int replicas) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
//設置索引的配置,1個分片1個副本。由於我們是單機ES,這個配置無關緊要,正式的線上環境記得要配置
HashMap<String,String> indexOption = new HashMap<>();
indexOption.put("index.number_of_shards",String.valueOf(shards));
indexOption.put("index.number_of_replicas",String.valueOf(replicas));
createIndexRequest.settings(indexOption);
//設置索引mapping,即字段的定義
createIndexRequest.mapping(mapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
}
public void addDocument(String document,String indexName) throws IOException {
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.source(document,XContentType.JSON);
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
}
@Service
@Slf4j
public class WriteLogService implements CommandLineRunner{
@Autowired
RestHighLevelClient restHighLevelClient;
@Autowired
ESUtil esUtil;
private static final Gson gson = new GsonBuilder().serializeNulls().create();
@Override
public void run(String... args) {
try {
//運行前檢查索引是否存在,不存在就新建一個
if (!restHighLevelClient.indices().exists(new GetIndexRequest(ES_USER_INDEX_PREFIX), RequestOptions.DEFAULT)) {
esUtil.createIndex(ES_USER_INDEX_PREFIX, MAPPING, 1, 1);
}
while (true) {
String user = gson.toJson(GenerateUserUtil.generate());
log.info("generate user:{}", user);
esUtil.addDocument(user, ES_USER_INDEX_PREFIX);
Thread.sleep(1000);
}
}catch (Exception e){
log.error("service had exception:{}", ExceptionUtils.getStackTrace(e));
}
}
}
寫入成功之后,就可以去Kibana的index Manager中添加user索引了。