文章轉載自:https://mp.weixin.qq.com/s/W9b28CFBEmxBPz5bGd1-hw
教程pdf文件下載地址
https://files.cnblogs.com/files/sanduzxcvbnm/ELK基於ElastAlert實現日志的微信報警.pdf
官方文檔下載地址
https://files.cnblogs.com/files/sanduzxcvbnm/elastalert-readthedocs-io-en-latest.pdf
注意事項:
1.文章中凡是用python的均表示的是使用python3的版本,但是不能使用python3,因為安裝的elastalert模塊中好多文件開頭用的是#!/usr/bin/env python,或者沒有添加這個
也就是說假若系統使用雙版本的python,python表示的是2版本,python3表示的是3版本,需要把python表示的2的版本鏈接給刪除,換成python3的,最終結果是執行python出現的是3的版本信息。
2.關於elastalert_modules文件夾,並不是放在elastalert在python3的安裝路徑下,放在~/elastalert
路徑下了,然后進入到~/elastalert
路徑下,執行命令:python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml
一、ElastAlert介紹
在日志管理上我們使用Elasticsearch,Logstash和Kibana技術棧來管理不斷增長的數據和日志,但是對於錯誤日志的監控ELK架構並沒有提供,所以我們需要使用到第三方工具ElastAlert,來幫助我們及時發現業務中存在的問題。
ElastAlert通過定期查詢Elasticsearch,並將數據傳遞到規則類型,該規則類型確定何時找到匹配項。發生匹配時,將為該警報提供一個或多個警報,這些警報將根據匹配采取行動。
這是由一組規則配置的,每個規則定義一個查詢,一個規則類型和一組警報。
ElastAlert支持以下方式報警
- Command (可調用短信接口)
- JIRA
- OpsGenie
- SNS
- HipChat
- Slack
- Telegram
- Debug
- Stomp
除了這種基本用法外,還有許多其他功能使警報更加有用:
- 警報鏈接到Kibana儀表板
- 任意字段的合計計數
- 將警報合並為定期報告
- 通過使用唯一鍵字段來分隔警報
- 攔截並增強比賽數據
二、部署ElastAlert
. 1. 部署所需環境
ELK 環境部署
EFK6.3+kafka+logstash日志分析平台集群
安裝依賴包
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel
部署python3.6
mkdir -p /usr/local/python3
cd /usr/local/python3
wget https://www.python.org/ftp/python/3.6.9/Python-3.6.9.tgz
tar xf Python-3.6.9.tgz
cd Python-3.6.9
./configure --prefix=/usr/local/python3
make && make install
配置環境變量
# 將python2.7 軟鏈刪除,換成python3.6
rm /usr/bin/python
ln -s /usr/local/python3/bin/python3.6 /usr/bin/python
rm /usr/bin/pip
ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
# 最終效果
[root@docker Python-3.6.9]# ll /usr/bin/python*
lrwxrwxrwx 1 root root 32 Sep 4 09:57 /usr/bin/python -> /usr/local/python3/bin/python3.6
lrwxrwxrwx. 1 root root 9 Oct 10 2019 /usr/bin/python2 -> python2.7
-rwxr-xr-x. 1 root root 7216 Aug 7 2019 /usr/bin/python2.7
[root@docker Python-3.6.9]# ll /usr/bin/pip*
lrwxrwxrwx 1 root root 27 Sep 4 09:57 /usr/bin/pip -> /usr/local/python3/bin/pip3
-rwxr-xr-x 1 root root 284 Jan 29 2020 /usr/bin/pip2
-rwxr-xr-x 1 root root 288 Jan 29 2020 /usr/bin/pip2.7
驗證版本
# python
Python 3.6.9 (default, Jun 2 2020, 12:12:43)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
# pip -V
pip 18.1 from /usr/local/python3/lib/python3.6/site-packages/pip (python 3.6)
. 2. 部署ElastAlert
cd /app
git clone https://github.com/Yelp/elastalert.git
安裝模塊:
cd elastalert
pip install "setuptools>=11.3"
python setup.py install
pip install -r requirements.txt
安裝完后,會在 /usr/local/bin/ 或者 /usr/local/python3/bin/ 下生成4個elastalert命令
$ ll /usr/local/bin/elastalert*
-rwxr-xr-x 1 root root 396 2月 14 10:03 /usr/local/bin/elastalert
-rwxr-xr-x 1 root root 422 2月 14 10:03 /usr/local/bin/elastalert-create-index
-rwxr-xr-x 1 root root 430 2月 14 10:03 /usr/local/bin/elastalert-rule-from-kibana
-rwxr-xr-x 1 root root 416 2月 14 10:03 /usr/local/bin/elastalert-test-rule
根據Elasticsearch的版本,您可能需要手動安裝正確版本的elasticsearch-py。
Elasticsearch 5.0+:
pip install "elasticsearch>=5.0.0"
Elasticsearch 2.X:
pip install "elasticsearch<3.0.0"
. 3. 配置ElastAlert
配置config.yaml 文件
# cp config.yaml.example config.yaml
# cat config.yaml
rules_folder: example_rules
run_every:
seconds: 10
buffer_time:
minutes: 15
es_host: 10.1.144.208
es_port: 9201
#es_username: elastic
#es_password: 123456
writeback_index: elastalert_status
alert_time_limit:
days: 2
rules_folder
:ElastAlert從中加載規則配置文件的位置。它將嘗試加載文件夾中的每個.yaml文件。沒有任何有效規則,ElastAlert將無法啟動。
run_every
:ElastAlert多久查詢一次Elasticsearch的時間。
buffer_time
:查詢窗口的大小,從運行每個查詢的時間開始向后延伸。對於其中use_count_query或use_terms_query設置為true的規則,將忽略此值。
es_host
:是Elasticsearch群集的地址,ElastAlert將在其中存儲有關其狀態,查詢運行,警報和錯誤的數據。
es_port
:es對應的端口。
es_username
:可選的; 用於連接的basic-auth用戶名es_host。
es_password
:可選的; 用於連接的basic-auth密碼es_host。
es_send_get_body_as
:可選的; 方法查詢Elasticsearch - GET,POST或source。默認是GET
writeback_index
:ElastAlert將在其中存儲數據的索引的名稱。我們稍后將創建此索引。
alert_time_limit
:失敗警報的重試窗口。
創建elastalert-create-index索引
# elastalert-create-index
New index name (Default elastalert_status)
Name of existing index to copy (Default None)
New index elastalert_status created
Done!
具體效果
三、使用微信報警
由於ElastAlert沒有內置企業微信的報警方式,我們還需要使用一個開源插件elastalert-wechat-plugin來實現微信的報警,Github項目地址
. 1. 下載項目文件
mkdir -p ~/elastalert/elastalert_modules
# 下載該文件的地址有問題,導致無法下載,因此這一步就不下載了,直接使用下一步修改好的內容復制粘貼創建這個文件即可
# wget -P ~/elastalert/elastalert_modules/ wget https://raw.githubusercontent.com/anjia0532/elastalert-wechat-plugin/master/elastalert_modules/wechat_qiye_alert.py
touch ~/elastalert/elastalert_modules/__init__.py
. 2. 修改插件源碼
由於這個插件是基於python2.x版本開發的,而ElastAlert的最新版本使用的是python3.6版本開發,所以需要改一些代碼,以便正常運行,另外還添添加了轉中文字符功能。
wechat_qiye_alert.py修改后如下:
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import json
import datetime
from elastalert.alerts import Alerter, BasicMatchString
from requests.exceptions import RequestException
from elastalert.util import elastalert_logger,EAException #[感謝minminmsn分享](https://github.com/anjia0532/elastalert-wechat-plugin/issues/2#issuecomment-311014492)
import requests
from elastalert_modules.MyEncoder import MyEncoder
'''
#################################################################
# 微信企業號推送消息 #
# #
# 作者: AnJia <anjia0532@gmail.com> #
# 作者博客: https://anjia.ml/ #
# Github: https://github.com/anjia0532/elastalert-wechat-plugin #
# #
#################################################################
'''
class WeChatAlerter(Alerter):
#企業號id,secret,應用id必填
required_options = frozenset(['corp_id','secret','agent_id'])
def __init__(self, *args):
super(WeChatAlerter, self).__init__(*args)
self.corp_id = self.rule.get('corp_id', '') #企業號id
self.secret = self.rule.get('secret', '') #secret
self.agent_id = self.rule.get('agent_id', '') #應用id
self.party_id = self.rule.get('party_id') #部門id
self.user_id = self.rule.get('user_id', '') #用戶id,多人用 | 分割,全部用 @all
self.tag_id = self.rule.get('tag_id', '') #標簽id
self.access_token = '' #微信身份令牌
self.expires_in=datetime.datetime.now() - datetime.timedelta(seconds=60)
def create_default_title(self, matches):
subject = 'ElastAlert: %s' % (self.rule['name'])
return subject
def alert(self, matches):
if not self.party_id and not self.user_id and not self.tag_id:
elastalert_logger.warn("All touser & toparty & totag invalid")
# 參考elastalert的寫法
# https://github.com/Yelp/elastalert/blob/master/elastalert/alerts.py#L236-L243
body = self.create_alert_body(matches)
#matches 是json格式
#self.create_alert_body(matches)是String格式,詳見 [create_alert_body 函數](https://github.com/Yelp/elastalert/blob/master/elastalert/alerts.py)
# 微信企業號獲取Token文檔
# http://qydev.weixin.qq.com/wiki/index.php?title=AccessToken
self.get_token()
self.senddata(body)
elastalert_logger.info("send message to %s" % (self.corp_id))
def get_token(self):
#獲取token是有次數限制的,本想本地緩存過期時間和token,但是elastalert每次調用都是一次性的,不能全局緩存
if self.expires_in >= datetime.datetime.now() and self.access_token:
return self.access_token
#構建獲取token的url
get_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' %(self.corp_id,self.secret)
try:
response = requests.get(get_token_url)
response.raise_for_status()
except RequestException as e:
raise EAException("get access_token failed , stacktrace:%s" % e)
#sys.exit("get access_token failed, system exit")
token_json = response.json()
if 'access_token' not in token_json :
raise EAException("get access_token failed , , the response is :%s" % response.text())
#sys.exit("get access_token failed, system exit")
#獲取access_token和expires_in
self.access_token = token_json['access_token']
self.expires_in = datetime.datetime.now() + datetime.timedelta(seconds=token_json['expires_in'])
return self.access_token
def senddata(self, content):
#如果需要原始json,需要傳入matches
# http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8F
# 微信企業號有字符長度限制(2048),超長自動截斷
# 參考 http://blog.csdn.net/handsomekang/article/details/9397025
#len utf8 3字節,gbk2 字節,ascii 1字節
if len(content) > 512 :
content = content[:512] + "..."
# 微信發送消息文檔
# http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8F
send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s' %( self.access_token)
headers = {'content-type': 'application/json'}
# 替換消息標題為中文,下面的字段為logstash切分的日志字段
title_dict = {
# "At least": "報警規則:At least",
"@timestamp": "報警時間",
"_index": "索引名稱",
"_type": "索引類型",
"ServerIP": "報警主機",
"hostname": "報警機器",
"message": "報警內容",
"class": "報錯類",
"lineNum": "報錯行",
"num_hits": "文檔命中數",
"num_matches": "文檔匹配數"
}
#print(f"type:{type(content)}")
for k, v in title_dict.items():
content = content.replace(k, v, 1 )
# 最新微信企業號調整校驗規則,tagid必須是string類型,如果是數字類型會報錯,故而使用str()函數進行轉換
payload = {
"touser": self.user_id and str(self.user_id) or '', #用戶賬戶,建議使用tag
"toparty": self.party_id and str(self.party_id) or '', #部門id,建議使用tag
"totag": self.tag_id and str(self.tag_id) or '', #tag可以很靈活的控制發送群體細粒度。比較理想的推送應該是,在heartbeat或者其他elastic工具自定義字段,添加標簽id。這邊根據自定義的標簽id,進行推送
'msgtype': "text",
"agentid": self.agent_id,
"text":{
"content": content.encode('UTF-8').decode("latin1") #避免中文字符發送失敗
},
"safe":"0"
}
# set https proxy, if it was provided
# 如果需要設置代理,可修改此參數並傳入requests
# proxies = {'https': self.pagerduty_proxy} if self.pagerduty_proxy else None
try:
#response = requests.post(send_url, data=json.dumps(payload, ensure_ascii=False), headers=headers)
response = requests.post(send_url, data=json.dumps(payload, cls=MyEncoder, indent=4, ensure_ascii=False), headers=headers)
response.raise_for_status()
except RequestException as e:
raise EAException("send message has error: %s" % e)
elastalert_logger.info("send msg and response: %s" % response.text)
def get_info(self):
return {'type': 'WeChatAlerter'}
在同級目錄下創建MyEncoder.py文件
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return str(obj, encoding='utf-8')
return json.JSONEncoder.default(self, obj)
. 3. 申請企業微信賬號
step 1: 訪問網站 注冊企業微信賬號(不需要企業認證)。
step 2: 訪問apps 創建第三方應用,點擊創建應用按鈕 -> 填寫應用信息:
Step3: 創建部門,獲取部門ID
步驟詳見網址:https://www.cnblogs.com/sanduzxcvbnm/p/13612580.html
. 4. 配置報警規則
配置規則文件
$ cd /app/elastalert/example_rules/
$ cp example_frequency.yaml sms-applog.yaml
$ cat sms-applog.yaml | grep -v ^#
name: 【日志報警】
use_strftine_index: true
type: frequency
index: filebeat-*
num_events: 1
timeframe:
minutes: 1
filter:
- query:
query_string:
query: '"\[ERROR\]" NOT "發送郵件失敗"'
alert:
- "elastalert_modules.wechat_qiye_alert.WeChatAlerter"
corp_id: wxdxxx40b4720f24
secret: xa4pWq63sxxtaZzzEg8X860ZBIoOkToCbh_oNc
agent_id: 1000002
party_id: 2
index
:要查詢的索引的名稱, ES中存在的索引。
num_events
:此參數特定於frequency類型,並且是觸發警報時的閾值。
filter
:用於過濾結果的Elasticsearch過濾器列表,這里的規則定義是除了包含“發送郵件失敗”的錯誤日志,其他所有ERROR的日志都會觸發報警。
alert
:定義報警方式,我們這里采用企業微信報警。
corp_id
:企業微信的接口認證信息
. 5. 運行ElastAlert
# cd ~/elastalert
$ python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml
1 rules loaded
INFO:elastalert:Starting up
INFO:elastalert:Disabled rules are: []
INFO:elastalert:Sleeping for 9.999904 seconds
INFO:elastalert:Queried rule 【日志報警】 from 2020-06-05 17:47 CST to 2020-06-05 17:47 CST: 0 / 0 hits
INFO:elastalert:Ran 【日志報警】 from 2020-06-05 17:47 CST to 2020-06-05 17:47 CST: 0 query hits (0 already seen), 0 matches, 0 alerts sent
后台運行
$ nohup python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml > nohup.txt 2>&1 &
注意:執行后提示找不到elastalert_modules模塊的話,需要在
~/
目錄下創建elastalert文件夾,然后再把elastalert_modules文件夾給放進去,而不是把elastalert_modules文件夾放在elastalert模塊的安裝路徑下
出現這個提示:
是因為設置的接收報警的是一個報警組,但是應用中設置接收報警的是一個微信賬戶,只需要在應用中找到可接受范圍修改成報警組就行了
正常發送報警的消息:
個人企業微信中收到的消息截圖:
延伸:
1.監控規則中監控的日志索引是:filebeat-*
,也就是說在elk中要有這個日志索引存在
2.監控規則如下:
filter:
- query:
query_string:
query: '"\[ERROR\]" NOT "發送郵件失敗"'
日志索引中只要不包含"發送郵件失敗",凡是包含"[ERROR]"的信息都會觸發報警
因此可以這樣進行驗證:
使用filebeat監控具體某一個日志文件,直接傳輸給es,但是傳輸給ES后創建的索引必須是以"filebeat-"開頭的,具體可以看這個:https://www.cnblogs.com/sanduzxcvbnm/p/12350618.html
然后呢,手動往這個日志文件中寫數據,在kibana中查看這個索引內容,然后查看是否觸發報警:
1.手動往監控的日志文件中寫數據,含有“[ERROR]”信息就行
2.在kibana中查看上一條數據是否寫進來了
3.查看是否報警
4.查看具體報警內容
3.若是想監控多個日志索引怎么辦
可以再創建一個yaml報警規則的文件,然后再啟動一個命令,指定使用這個報警規則文件
4.目錄搞不明白的話可以看如下的目錄結構
當前使用的目錄結構如下:
目錄結構:
/app/elastalert # 以下這些目錄和文件是克隆git地址所提供的
└── build
└── changelog.md
└── config.yaml # 配置文件
└── config.yaml.example
└── dist
└── docker-compose.yml
└── Dockerfile-test
└── docs
└── elastalert
└── elastalert.egg-info
└── example_rules # 報警規則目錄
└── LICENSE
└── Makefile
└── pytest.ini
└── README.md
└── requirements-dev.txt
└── requirements.txt
└── setup.cfg
└── setup.py
└── supervisord.conf.example
└── tests
└── tox.ini
~/elastalert # 這個是單獨創建的目錄
├── elastalert_modules
├ ├── __init__.py
├ ├── wechat_qiye_alert.py
使用的命令需要先進入到~/elastalert
目錄下執行,指定使用的配置文件,指定使用的報警規則
python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml
# 后台運行
nohup python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml > nohup.txt 2>&1 &
或者采用如下的這種目錄結構:
~/elastalert
├── elastalert_modules # 這個是單獨創建的目錄,里面的這倆文件需要單獨創建
├ ├── __init__.py
├ ├── wechat_qiye_alert.py
└── build # 以下這些目錄是克隆git地址所提供的
└── changelog.md
└── config.yaml # 配置文件
└── config.yaml.example
└── dist
└── docker-compose.yml
└── Dockerfile-test
└── docs
└── elastalert
└── elastalert.egg-info
└── example_rules # 報警規則目錄
└── LICENSE
└── Makefile
└── pytest.ini
└── README.md
└── requirements-dev.txt
└── requirements.txt
└── setup.cfg
└── setup.py
└── supervisord.conf.example
└── tests
└── tox.ini
使用的命令需要先進入到~/elastalert
目錄下執行,指定使用的報警規則
python -m elastalert.elastalert --verbose --rule example_rules/example_frequency.yaml
```