1.簡介
ELK(elasticsearch+logstash+kibana)是目前比較常用的日志分析系統,包括日志收集(logstash),日志存儲搜索(elasticsearch),展示查詢(kibana),使用ELK作為日志的存儲分析系統並通過為每個請求分配requestId鏈接相關日志。ELK具體結構如下圖所示:
1.1es基本概念
Index
類似於mysql數據庫中的database
Type
類似於mysql數據庫中的table表,es中可以在Index中建立type(table),通過mapping進行映射。
Document
由於es存儲的數據是文檔型的,一條數據對應一篇文檔即相當於mysql數據庫中的一行數據row, 一個文檔中可以有多個字段也就是mysql數據庫一行可以有多列。
Field
es中一個文檔中對應的多個列與mysql數據庫中每一列對應
Mapping
可以理解為mysql或者solr中對應的schema,只不過有些時候es中的mapping增加了動態識別功能,感覺很強大的樣子, 其實實際生產環境上不建議使用,最好還是開始制定好了對應的schema為主。
indexed
就是名義上的建立索引。mysql中一般會對經常使用的列增加相應的索引用於提高查詢速度,而在es中默認都是會加 上索引的,除非你特殊制定不建立索引只是進行存儲用於展示,這個需要看你具體的需求和業務進行設定了。
Query DSL
類似於mysql的sql語句,只不過在es中是使用的json格式的查詢語句,專業術語就叫:QueryDSL
GET/PUT/POST/DELETE
分別類似與mysql中的select/update/delete......
1.2es架構
elasticsearch提供了一個分布式多用戶能力的全文搜索引擎,基於RESTfulweb接口。ElasticSearch是用Java開發的, 並作為Apache許可條款下的開放源碼發布,是當前流行的企業級搜索引擎。設計用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。構建在全 文檢索開源軟件Lucene之上的Elasticsearch,不僅能對海量規模的數據完成分布式索引與檢索,還能提供數據聚合分析。據國際權威的 數據庫產品評測機構DBEngines的統計,在2016年1月,Elasticsearch已超過Solr等,成為排名第一的搜索引擎類應用 概括:基於Restful標准的高擴展高可用的實時數據分析的全文搜索工具,elasticsearch架構
Gateway層
es用來存儲索引文件的一個文件系統且它支持很多類型,例如:本地磁盤、共享存儲(做snapshot的時候需要用到)、hadoop 的hdfs分布式存儲、亞馬遜的S3。它的主要職責是用來對數據進行長持久化以及整個集群重啟之后可以通過gateway重新恢復數據。
Distributed Lucene Directory
Gateway上層就是一個lucene的分布式框架,lucene是做檢索的,但是它是一個單機的搜索引擎,像這種es分布式搜索引擎系 統,雖然底層用lucene,但是需要在每個節點上都運行lucene進行相應的索引、查詢以及更新,所以需要做成一個分布式的運 行框架來滿足業務的需要。
四大模塊組件
districted lucene directory之上就是一些es的模塊
1. Index Module是索引模塊,就是對數據建立索引也就是通常所說的建立一些倒排索引等;
2. Search Module是搜索模塊,就是對數據進行查詢搜索;
3. Mapping模塊是數據映射與解析模塊,就是你的數據的每個字段可以根據你建立的表結構 通過mapping進行映射解析,如果你沒有建立表結構,es就會根據你的數據類型推測你 的數據結構之后自己生成一個mapping,然后都是根據這個mapping進行解析你的數據;
4. River模塊在es2.0之后應該是被取消了,它的意思表示是第三方插件,例如可以通過一 些自定義的腳本將傳統的數據庫(mysql)等數據源通過格式化轉換后直接同步到es集群里, 這個river大部分是自己寫的,寫出來的東西質量參差不齊,將這些東西集成到es中會引發 很多內部bug,嚴重影響了es的正常應用,所以在es2.0之后考慮將其去掉。
Discovery、Script
es4大模塊組件之上有 Discovery模塊:es是一個集群包含很多節點,很多節點需要互相發現對方,然后組成一個集群包括選 主的,這些es都是用的discovery模塊,默認使用的是Zen,也可是使用EC2;es查詢還可以支撐多種script即腳本語言,包括 mvel、js、python等等。
Transport協議層
再上一層就是es的通訊接口Transport,支持的也比較多:Thrift、Memcached以及Http,默認的是http,JMX就是java的一個 遠程監控管理框架,因為es是通過java實現的。
RESTful接口層
最上層就是es暴露給我們的訪問接口,官方推薦的方案就是這種Restful接口,直接發送http請求,方便后續使用nginx做代理、 分發包括可能后續會做權限的管理,通過http很容易做這方面的管理。如果使用java客戶端它是直接調用api,在做負載均衡以 及權限管理還是不太好做。
1.3RestfulAPI
一種軟件架構風格、設計風格,而不是標准,只是提供了一組設計原則和約束條件。它主要用於客戶端和服務器交互類的軟件。 基於這個風格設計的軟件可以更簡潔,更有層次,更易於實現緩存等機制。在目前主流的三種Web服務交互方案中,REST相比於S OAP(Simple Object Access protocol,簡單對象訪問協議)以及XML-RPC更加簡單明了。
接口類型:
(Representational State Transfer 意思是:表述性狀態傳遞)
它使用典型的HTTP方法,諸如GET,POST.DELETE,PUT來實現資源的獲取,添加,修改,刪除等操作。即通過HTTP動詞來實現資源的狀態扭轉
GET 用來獲取資源
POST 用來新建資源(也可以用於更新資源)
PUT 用來更新資源
DELETE 用來刪除資源
1.4CRUL命令
以命令的方式執行HTTP協議的請求
GET/POST/PUT/DELETE
示例:
訪問一個網頁
curl www.baidu.com
curl -o tt.html www.baidu.com
顯示響應的頭信息
curl -i www.baidu.com
顯示一次HTTP請求的通信過程
curl -v www.baidu.com
執行GET/POST/PUT/DELETE操作
curl -X GET/POST/PUT/DELETE url
2.安裝logstash
2.1安裝jdk
logstash需要依賴jdk,安裝logstash之前先安裝java環境。下載JDK,
在oracle的官方網站下載,http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html,根據操作系統的版本下載對應的JDK安裝包,本次實驗下載的是jdk-8u101-linux-x64.tar.gz
上傳文件到服務器並執行:
# mkdir /usr/local/java
# tar -zxf jdk-8u45-linux-x64.tar.gz -C /usr/local/java/
配置java環境:
export JAVA_HOME=/usr/local/java/jdk1.8.0_45
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$CLASSPATH
執行java -version命令,打印出java版本信息表示JDK配置成功。
# java -version
2.2下載logstash
下載並解壓
# wget https://download.elastic.co/logstash/logstash/logstash-2.4.0.tar.gz
# tar -xzvf logstash-2.4.0.tar.gz
進入安裝目錄: cd #{dir}/logstash-2.4.0,創建logstash測試配置文件:
# vim test.conf
編輯內容如下:
input {
stdin { }
}
output {
stdout {
codec => rubydebug {}
}
}
運行logstash測試:
# bin/logstash -f test.conf
顯示:
證明logstash已經啟動了,
輸入hello world
因為我們配置內容為,控制台輸出日志內容,所以顯示以上格式即為成功。
3. 安裝elasticsearch
下載安裝包:
wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/
tar/elasticsearch/2.4.0/elasticsearch-2.4.0.tar.gz
解壓並配置:
# tar -xzvf elasticsearch-2.4.0.tar.gz
# cd #{dir}/elasticsearch-2.4.0
# vim config/elasticsearch.yml
修改:
path.data: /data/es #數據路徑
path.logs: /data/logs/es #日志路徑
network.host: 本機地址 #服務器地址
http.port: 9200 #端口
配置執行用戶和目錄:
groupadd elsearch
useradd elsearch -g elsearch -p elasticsearch
chown -R elsearch:elsearch elasticsearch-2.4.0
mkdir /data/es
mkdir /data/logs/es
chown -R elsearch:elsearch /data/es
chown -R elsearch:elsearch /data/logs/es
啟動elasticsearch:
# su elsearch
# bin/elasticsearch
通過瀏覽器訪問:
安裝成功.
集成logstash和elasticsearch,修改Logstash配置為:
input {
stdin { }
}
output {
elasticsearch {
hosts => "elasticsearchIP:9200"
index => "logstash-test"
}
stdout {
codec => rubydebug {}
}
}
再次啟動logstash,並輸入任意文字:“hello elasticsearch”
通過elasticsearch搜索到了剛才輸入的文字,集成成功。通過elasticsearch的原生接口查詢和展示都不夠便捷直觀,下面我們配置一下更方便的查詢分析工具kibana。
4.安裝kibana
下載安裝包:
wget https://download.elastic.co/kibana/kibana/kibana-4.6.1-linux-x86_64.tar.gz
解壓kibana,並進入解壓后的目錄
打開config/kibana.yml,修改如下內容
#啟動端口 因為端口受限 所以變更了默認端口
server.port: 8601
#啟動服務的ip
server.host: “本機ip”
#elasticsearch地址
elasticsearch.url: http://elasticsearchIP:9200
啟動程序:
bin/kibana
訪問配置的ip:port,在discover中搜索剛才輸入的字符,內容非常美觀的展示了出來。
到這里elk環境已經配置完成,把自己java web項目試驗日志在elk中的使用。
5.創建web工程
一個普通的maven java web工程,為了測試分布式系統日志的連續性,我們讓這個項目自調用n次,並部署2個項目,相互調用,關鍵代碼如下:
controller
@RequestMapping("http_client")
@Controller
public class HttpClientTestController {
@Autowired
private HttpClientTestBo httpClientTestBo;
@RequestMapping(method = RequestMethod.POST)
@ResponseBody
public BaseResult doPost(@RequestBody HttpClientTestResult result) {
HttpClientTestResult testPost = httpClientTestBo.testPost(result);
return testPost;
}
}
service
@Service
public class HttpClientTestBo {
private static Logger logger = LoggerFactory.getLogger(HttpClientTestBo.class);
@Value("${test_http_client_url}")
private String testHttpClientUrl;
public HttpClientTestResult testPost(HttpClientTestResult result) {
logger.info(JSONObject.toJSONString(result));
result.setCount(result.getCount() + 1);
if (result.getCount() <= 3) {
Map<String, String> headerMap = new HashMap<String, String>();
String requestId = RequestIdUtil.requestIdThreadLocal.get();
headerMap.put(RequestIdUtil.REQUEST_ID_KEY, requestId);
Map<String, String> paramMap = new HashMap<String, String>();
paramMap.put("status", result.getStatus() + "");
paramMap.put("errorCode", result.getErrorCode());
paramMap.put("message", result.getMessage());
paramMap.put("count", result.getCount() + "");
String resultString = JsonHttpClientUtil.post(testHttpClientUrl, headerMap, paramMap, "UTF-8");
logger.info(resultString);
}
logger.info(JSONObject.toJSONString(result));
return result;
}
}
為了表示調用的鏈接性我們在web.xml中配置requestId的filter,用於創建
/*
* requestId
* requestIdFilter
* com.virxue.baseweb.utils.RequestIdFilter
* requestIdFilter
/*
public class RequestIdFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(RequestIdFilter.class);
/* (non-Javadoc)
* @see javax.servlet.Filter#init(javax.servlet.FilterConfig)
*/
public void init(FilterConfig filterConfig) throws ServletException {
logger.info("RequestIdFilter init");
}
/* (non-Javadoc)
* @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest, javax.servlet.ServletResponse, javax.servlet.FilterChain)
*/
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException,
ServletException {
String requestId = RequestIdUtil.getRequestId((HttpServletRequest) request);
MDC.put("requestId", requestId);
chain.doFilter(request, response);
RequestIdUtil.requestIdThreadLocal.remove();
MDC.remove("requestId");
}
/* (non-Javadoc)
* @see javax.servlet.Filter#destroy()
*/
public void destroy() {
}
}
utils
public class RequestIdUtil {
public static final String REQUEST_ID_KEY = "requestId";
public static ThreadLocal<String> requestIdThreadLocal = new ThreadLocal<String>();
private static final Logger logger = LoggerFactory.getLogger(RequestIdUtil.class);
/**
* 獲取requestId
* @Title getRequestId
* @Description TODO
* @return
*
* @author zhengkai
* @date 2019年10月3日 下午21:58:28
*/
public static String getRequestId(HttpServletRequest request) {
String requestId = null;
String parameterRequestId = request.getParameter(REQUEST_ID_KEY);
String headerRequestId = request.getHeader(REQUEST_ID_KEY);
if (parameterRequestId == null && headerRequestId == null) {
logger.info("request parameter 和header 都沒有requestId入參");
requestId = UUID.randomUUID().toString();
} else {
requestId = parameterRequestId != null ? parameterRequestId : headerRequestId;
}
requestIdThreadLocal.set(requestId);
return requestId;
}
}
我們使使用了Logback作為日志輸出的插件,並且使用它的MDC類,可以無侵入的在任何地方輸出requestId,具體的配置如下:
UTF-8
${log_base}/java-base-web.log
${log_base}/java-base-web-%d{yyyy-MM-dd}-%i.log 10 200MB
%d^|^%X{requestId}^|^%-5level^|^%logger{36}%M^|^%msg%n
這里的日志格式使用了“^|^”做為分隔符,方便logstash進行切分。在測試服務器部署2個web項目,並且修改日志輸出位置,並修改url調用鏈接使項目相互調用。
6. 修改logstash讀取項目輸出日志
新增stdin.conf,內容如下:
input {
file {
path => ["/data/logs/java-base-web1/java-base-web.log", "/data/logs/java-base-web2/java-base-web.log"]
type => "logs"
start_position => "beginning"
codec => multiline {
pattern => "^\[\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}"
negate => true
what => "next"
}
}
}
filter{
mutate{
split=>["message","^|^"]
add_field => {
"messageJson" => "{datetime:%{[message][0]}, requestId:%{[message][1]},level:%{[message][2]}, class:%{[message][3]}, content:%{[message][4]}}"
}
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "10.160.110.48:9200"
index => "logstash-${type}"
}
stdout {
codec => rubydebug {}
}
}
其中path為日志文件地址;codec => multiline為處理Exception日志,使換行的異常內容和異常頭分割在同一個日志中;filter為日志內容切分,把日志內容做為json格式,方便查詢分析,測試一下: