基於的日志進行監控,監控需要一定規則,對觸發監控規則的日志信息進行告警,告警的方式,是短信和郵件。
log4j---->error,info,debug 應用程序程序的日志 error級別 TimeOutException 角標越界IndexXXXException ......Error
com.alibaba.jstorm.daemon.worker.WorkerData]-[INFO] Current worker taskList:[1, 2, 3, 4, 5, 6, 7]
String.contains.(" taskList ")-------------->當訂單量觸發一千萬時,告警通知,讓大家慶祝下。
OrdertotalNum:1000萬
kafaka生成集群的原理、分區
kafka消費者的負載均衡,kfakaSpout
Kafka broker(核心機制,topic,分片,文件存儲機制)
Redis API學習
spout:從外部數據源中讀取數據,然后轉換為topology
架構圖:
DataSource:外部數據源
Spout:接收外部數據源的組件,將外部數據源轉化成storm內部的數據,以Tuple為基本的傳輸單元下發給Bolt.
Bolt:接受Spout發送的數據,或上游的bolt的發送的數據,根據業務邏輯進行處理,發送給下一個Bolt或者是存儲到某種介質上,例如Redis。
Tuple:Storm內部中數據傳輸的基本單元,里面封裝了一個List對象,用來保存數據。
StreamGroup:數據分鍾策略,7種,shuffleGrouping,Non Grouping,FieldGrouping,Local or ShuffleGrouping.
Nimbus:任務分配
Supervisor:接受任務,並啟動worker,worker的數量是根據端口號來的。
Worker:執行任務的具體組件(JVM),可以執行兩種類型的任務,Spout任務或者bolt任務
Task:一個task屬於一個Spout或者Bolt並發任務。
zk:保存任務分配的信息,心跳信息,元數據信息。
1、背景知識
一款優秀的軟件需要具備的特點
l 軟件的實用性。
所謂有的放矢,軟件的誕生是為了解決特定的問題,比如現在流行的MVC 框架,早期的沒有MVC 開發的時候,耦合度很大,后期維護更新成本高,難度大,這樣MVC 框架就孕育而生;比如陌陌這種社交軟件,是為了解決陌生人之間交流的問題;比如疼醒這種軟件是為了解決人們遠程溝通的問題;比如OA系統為了解決公司協同流程、項目管理、知識管理等問題……所以一款優秀的軟件必須能夠解決一個領域內的問題。
l 軟件的穩定性。
軟件的實用性問題解決之后,急需要解決的問題就是軟件的穩定性。一般線上系統都會承載企業的某項業務,系統的穩定性直接影響了業務是否能夠正常運營。很多創業公司在前期只注重業務的發展,不太在意系統的穩定性,一旦用戶兩比較大的之后,就會出現很多性能的問題。這種情況就好比,你找了一個妹子,並准備深入交往后結婚,卻發現這個妹子總是有很多異性朋友在聯系……
l 代碼的規范性
鐵打的營盤流水的兵,一款優秀的軟件不僅僅是功能的實現。整體架構、功能模塊、代碼注釋、擴展性等問題也也需要考慮,畢竟在一個軟件的生命周期過程中,參與的人實在是太多了,主創人員也可能隨時流式。所以代碼的規范性就難能可貴了。
l 升級保持向前兼容性。
如果一個軟件平常使用挺好的,但是升級卻越來越費勁,或者升級后穩定性大打折扣,也難以稱得上一個好的軟件。
l 基本的使用手冊
文檔、文檔、文檔、一個簡單有效的使用手冊,才是程序的王道,知其然才能知其所以然。能讓用戶一目了然,功能、架構、設計思路、代碼等等。
2、需求分析
隨着公司業務發展,支撐公司業務的各種系統越來越多,為了保證公司的業務正常發展,急需要對這些線上系統的運行進行監控,做到問題的及時發現和處理,最大程度減少對業務的影響。
目前系統分類有:
1) 有基於Tomcat的web應用
2) 有獨立的Java Application應用
3) 有運行在Linux上的腳本程序
4) 有大規模的集群框架(zookeeper、Hadoop、Storm、SRP……)
5) 有操作系統的運行日志
主要功能需求分為:
監控系統日志中的內容,按照一定規則進行過濾
發現問題之后通過短信和郵件進行告警
3、功能分析
l 數據輸入
使用flume客戶端獲取個系統的數據;
用戶通過頁面輸入系統名稱、負責人觸發規則等信息
l 數據存儲
使用flume采集數據並存放在kafka集群中
l 數據計算
使用storm編寫程序對日志進行過濾,將滿足過濾規則的信息,通過郵件短信告警並保存到數據庫中
l 數據展示
管理頁面可以查看觸發規則的信息,系統負責人,聯系方式,觸發信息明細等
4、原型設計
產品經理設計產品原形
5、架構設計
5.1、整體架構設計
主要架構為應用+flume+kafka+storm+MySQL+java web。數據流程如下:
1. 應用程序使用log4j產生日志
2. 部署flume客戶端監控應用程序產生的日志信息,並發送到kafka集群中
3. storm spout拉去kafka的數據進行消費,逐條過濾每條日志的進行規則判斷,對符合規則的日志進行郵件告警。
4. 最后將告警的信息保存到mysql數據庫中,用來進行管理。
5.2、Flume設計
l Flume說明
Flume是一個分布式、可靠地、可用的服務,用來收集、聚合、傳輸日志數據。
它是一個基於流式數據的架構,簡單而靈活。具有健壯性、容錯機制、故障轉移、恢復機制。
它提供一個簡單的可擴展的數據模型,容許在線分析程序。F
Flume 作為 cloudera 開發的實時日志收集系統,受到了業界的認可與廣泛應用。
l Flume 設計摘要
使用 Flume EXEC執行一個linux命令來生成數據源。例如,可以用tail命令監控一個文件,那么,只要文件增加內容,EXEC就可以將增加的內容作為數據源發送出去。
使用 org.apache.flume.plugins.KafkaSink,將Flume EXEC產生的數據源發送到Kafka中。
5.3、Kafka設計
l Kafka說明
kafka是一個分布式消息隊列:生產者、消費者的功能。
l Kakfa設計摘要
部署kafka集群,在集群中添加一個Topic:monitor_realtime_javaxy
5.4、Storm設計
l KafkaSpout讀取數據,需要配置Topic:monitor_realtime_javaxy
l FilterBolt判斷規則
l NotifyBolt用來發送郵件或短信息
l Save2DB用來將告警信息寫入mysql數據庫
5.5、 數據模型設計
5.5.1、用戶表
用來保存用戶的信息,包括賬號、手機號碼、郵箱、是否有效等信息
5.5.2、應用表
用來保存應用的信息,包括應用名稱、應用描述、應用是否在線等信息
5.5.3、應用類型表
用來保存應用的類型等信息
5.5.4、規則表
用來保存規則的信息,包括規則名稱,規則描述,規則關鍵詞等信息
5.5.5、規則記錄表
用來保存觸發規則后的記錄,包括告警編號、是否短信告知、是否郵件告知、告警明細等信息。
6、 代碼開發
6.1、 整體結構
6.2、 LogMonitorTopologyMain驅動類
public class LogMonitorTopologyMain {
private static Logger logger = Logger.getLogger(LogMonitorTopologyMain.class);
public static void main(String[] args) throws Exception{
// 使用TopologyBuilder進行構建驅動類
TopologyBuilder builder = new TopologyBuilder();
// 設置kafka的zookeeper集群
// BrokerHosts hosts = new ZkHosts("zk01:2181,zk02:2181,zk03:2181");
//// // 初始化配置信息
// SpoutConfig spoutConfig = new SpoutConfig(hosts, "logmonitor", "/aaa", "log_monitor");
// 在topology中設置spout
// builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig),3);
builder.setSpout("kafka-spout",new RandomSpout(new StringScheme()),2);
builder.setBolt("filter-bolt",new FilterBolt(),3).shuffleGrouping("kafka-spout");
builder.setBolt("prepareRecord-bolt",new PrepareRecordBolt(),2).fieldsGrouping("filter-bolt", new Fields("appId"));
builder.setBolt("saveMessage-bolt",new SaveMessage2MySql(),2).shuffleGrouping("prepareRecord-bolt");
//啟動topology的配置信息
Config topologConf = new Config();
//TOPOLOGY_DEBUG(setDebug), 當它被設置成true的話, storm會記錄下每個組件所發射的每條消息。
//這在本地環境調試topology很有用, 但是在線上這么做的話會影響性能的。
topologConf.setDebug(true);
//storm的運行有兩種模式: 本地模式和分布式模式.
if (args != null && args.length > 0) {
//定義你希望集群分配多少個工作進程給你來執行這個topology
topologConf.setNumWorkers(2);
//向集群提交topology
StormSubmitter.submitTopologyWithProgressBar(args[0], topologConf, builder.createTopology());
} else {
topologConf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster(www.hbwfjx.cn/);
cluster.submitTopology("word-count", topologConf, builder.createTopology());
Utils.sleep(10000000);
cluster.shutdown();
}
}
}
6.3、FilterBolt用來過濾日志信息
主要是過濾格式和校驗appId是否合法。
public void execute(Tuple input, BasicOutputCollector collector) {
//獲取KafkaSpout發送出來的數據
String line = input.getString(0);
//獲取kafka發送的數據,是一個byte數組
// byte[] value = (byte[]www.sb45475.com) input.getValue(0);
//將數組轉化成字符串
// String line = new String(value);
//對數據進行解析
// appid content
//1 error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao
Message message = MonitorHandler.parser(http://www.yuheng119.com/ line);
if (message == null) {
return;
}
if (MonitorHandler.trigger(message)) {
collector.emit(new Values(message.getAppId(), message));
}
//定時更新規則信息
MonitorHandler.scheduleLoad();
}
6.4、PrepareRecordBolt發送郵件告警和短信告警
public void execute(Tuple input, BasicOutputCollector collector) {
Message message = (Message) input.getValueByField("www.yigouylpt2.com message");
String appId = input.getStringByField("appId");
//將觸發規則的信息進行通知
MonitorHandler.notifly(appId, message);
Record record = new Record();
try {
BeanUtils.copyProperties(record, message);
collector.emit(new Values(record));
} catch (Exception e) {
}
}
6.6、 SaveMessage2MySq保存到數據庫
public class SaveMessage2MySql extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(SaveMessage2MySql.class);
public void execute(Tuple input, BasicOutputCollector collector) {
Record record = (Record) input.getValueByField("record");
MonitorHandler.save(record);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
6.7、核心類 MonitorHandler 所有流程處理的核心代碼
public class MonitorHandler {
private static Logger logger = Logger.getLogger(MonitorHandler.class);
//定義一個map,其中appId為Key,以該appId下的所有rule為Value
private static Map<String, List<Rule>> ruleMap;
//定義一個map,其中appId為Key,以該appId下的所有user為Value
private static Map<String, List<User>> userMap;
//定義一個list,用來封裝所有的應用信息
private static List<App> applist;
//定義一個list,用來封裝所有的用戶信息
private static List<User> userList;
//定時加載配置文件的標識
private static boolean reloaded = false;
//定時加載配置文件的標識
private static long nextReload = 0l;
static {
load();
}
/**
* 解析輸入的日志,將數據按照一定的規則進行分割。
* 判斷日志是否合法,主要校驗日志所屬應用的appId是否存在
*
* @param line 一條日志
* @return
*/
public static Message parser(String line) {
//日志內容分為兩個部分:由5個$$$$$符號作為分隔符,第一部分為appid,第二部分為日志內容。
String[] messageArr = line.split("\\$\\$\\$\\$\\$");
//對日志進行校驗
if (messageArr.length != 2) {
return null;
}
if (StringUtils.isBlank(messageArr[0]) || StringUtils.isBlank(messageArr[1])) {
return null;
}
//檢驗當前日志所屬的appid是否是經過授權的。
if (apppIdisValid(messageArr[0].trim())) {
Message message = new Message();
message.setAppId(messageArr[0].trim());
message.setLine(messageArr[1]);
return message;
}
return null;
}
/**
* 驗證appid是否經過授權
*/
private static boolean apppIdisValid(String appId) {
try {
for (App app : applist) {
if (app.getId() == Integer.parseInt(appId)) {
return true;
}
}
} catch (Exception e) {
return false;
}
return false;
}
/**
* 對日志進行規制判定,看看是否觸發規則
* @param message
* @return
*/
public static boolean trigger(Message message) {
//如果規則模型為空,需要初始化加載規則模型
if (ruleMap == null) {
load();
}
//從規則模型中獲取當前appid配置的規則
System.out.println(message.getAppId());
List<Rule> keywordByAppIdList = ruleMap.get(message.getAppId());
for (Rule rule : keywordByAppIdList) {
//如果日志中包含過濾過的關鍵詞,即為匹配成功
if (message.getLine().contains(rule.getKeyword())) {
message.setRuleId(rule.getId() + "");
message.setKeyword(rule.getKeyword());
return true;
}
}
return false;
}
/**
* 加載數據模型,主要是用戶列表、應用管理表、組合規則模型、組合用戶模型。
*/
public static synchronized void load() {
if (userList == null) {
userList = loadUserList();
}
if (applist == null) {
applist = loadAppList();
}
if (ruleMap == null) {
ruleMap = loadRuleMap();
}
if (userMap == null) {
userMap = loadUserMap();
}
}
/**
* 訪問數據庫獲取所有有效的app列表
* @return
*/
private static List<App> loadAppList() {
return new LogMonitorDao().getAppList();
}
/**
* 訪問數據庫獲取所有有效用戶的列表
* @return
*/
private static List<User> loadUserList() {
return new LogMonitorDao().getUserList();
}
/**
* 封裝應用與用戶對應的map
* @return
*/
private static Map<String, List<User>> loadUserMap() {
//以應用的appId為key,以應用的所有負責人的userList對象為value。
//HashMap<String, List<User>>
HashMap<String, List<User>> map = new HashMap<String, List<User>>();
for (App app : applist) {
String userIds = app.getUserId();
List<User> userListInApp = map.get(app.getId());
if (userListInApp == null) {
userListInApp = new ArrayList<User>();
map.put(app.getId() + "", userListInApp);
}
String[] userIdArr = userIds.split(",");
for (String userId : userIdArr) {
userListInApp.add(queryUserById(userId));
}
map.put(app.getId() + "", userListInApp);
}
return map;
}
/**
* 封裝應用與規則的map
* @return
*/
private static Map<String, List<Rule>> loadRuleMap() {
Map<String, List<Rule>> map = new HashMap<String, List<Rule>>();
LogMonitorDao logMonitorDao = new LogMonitorDao();
List<Rule> ruleList = logMonitorDao.getRuleList();
//將代表rule的list轉化成一個map,轉化的邏輯是,
// 從rule.getAppId作為map的key,然后將rule對象作為value傳入map
//Map<appId,ruleList> 一個appid的規則信息,保存在一個list中。
for (Rule rule : ruleList) {
List<Rule> ruleListByAppId = map.get(rule.getAppId()+"");
if (ruleListByAppId == null) {
ruleListByAppId = new ArrayList<Rule>();
map.put(rule.getAppId() + "", ruleListByAppId);
}
ruleListByAppId.add(rule);
map.put(rule.getAppId() + "", ruleListByAppId);
}
return map;
}
/**
* 通過用戶編號獲取用戶的JavaBean
* @param userId
* @return
*/
private static User queryUserById(String userId) {
for (User user : userList) {
if (user.getId() == Integer.parseInt(userId)) {
return user;
}
}
return null;
}
/**
* 通過app編號,獲取當前app的所有負責人列表
* @param appId
* @return
*/
public static List<User> getUserIdsByAppId(String appId) {
return userMap.get(appId);
}
/**
* 告警模塊,用來發送郵件和短信
* 短信功能由於短信資源匱乏,目前默認返回已發送。
* @param appId
* @param message
*/
public static void notifly(String appId, Message message) {
//通過appId獲取應用負責人的對象
List<User> users = getUserIdsByAppId(appId);
//發送郵件
if (sendMail(appId, users, message)) {
message.setIsEmail(1);
}
//發送短信
if (sendSMS(appId, users, message)) {
message.setIsPhone(1);
}
}
/**
* 發送短信的模塊
* 由於短信資源匱乏,目前該功能不開啟,默認true,即短信發送成功。
* 目前發送短信功能使用的是外部接口,外面接口的並發性沒法保證,會影響storm程序運行的效率。
* 后期可以改造為將短信數據發送到外部的消息隊里中,然后創建一個worker去發送短信。
* @param appId
* @param users
* @param message
* @return
*/
private static boolean sendSMS(String appId, List<User> users, Message message) {
// return true;
List<String> mobileList = new ArrayList<String>();
for (User user : users) {
mobileList.add(user.getMobile());
}
for (App app : applist) {
if (app.getId() == Integer.parseInt(appId.trim())) {
message.setAppName(app.getName());
break;
}
}
String content = "系統【" + message.getAppName() + "】在 " + DateUtils.getDateTime() + " 觸發規則 " + message.getRuleId() + ",關鍵字:" + message.getKeyword();
return SMSBase.sendSms(listToStringFormat(mobileList), content);
}
/**
* 發送郵件
* 后期可以改造為將郵件數據發送到外部的消息隊里中,然后創建一個worker去發送短信。
* @param appId
* @param userList
* @param message
* @return
*/
private static boolean sendMail(String appId, List<User> userList, Message message) {
List<String> receiver = new ArrayList<String>();
for (User user : userList) {
receiver.add(user.getEmail());
}
for (App app : applist) {
if (app.getId() == Integer.parseInt(appId.trim())) {
message.setAppName(app.getName());
break;
}
}
if (receiver.size() >= 1) {
String date = DateUtils.getDateTime();
String content = "系統【" + message.getAppName() + "】在 " + date + " 觸發規則 " + message.getRuleId() + " ,過濾關鍵字為:" + message.getKeyword() + " 錯誤內容:" + message.getLine();
MailInfo mailInfo = new MailInfo("系統運行日志監控", content, receiver, null);
return MessageSender.sendMail(mailInfo);
}
return false;
}
/**
* 保存觸發規則的信息,將觸發信息寫入到mysql數據庫中。
*
* @param record
*/
public static void save(Record record) {
new LogMonitorDao().saveRecord(record);
}
/**
* 將list轉換為String
* @param list
* @return
*/
private static String listToStringFormat(List<String> list) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < list.size(); i++) {
if (i == list.size() - 1) {
stringBuilder.append(list.get(i));
} else {
stringBuilder.append(list.get(i)).append(",");
}
}
return stringBuilder.toString();
}
/**
* 配置scheduleLoad重新加載底層數據模型。
*/
/**
* thread 4
* thread 3
* thread 2
*/
public static synchronized void reloadDataModel() {
// * thread 1 reloaded = true ----> reloaded = false
// * thread 2 reloaded = false
// * thread 2 reloaded = false
// * thread 2 reloaded = false
if (reloaded) {
long start = System.currentTimeMillis();
userList = loadUserList();
applist = loadAppList();
ruleMap = loadRuleMap();
userMap = loadUserMap();
reloaded = false;
nextReload = 0l;
logger.info("配置文件reload完成,時間:"+DateUtils.getDateTime()+" 耗時:"+ (System.currentTimeMillis()-start));
}
}
/**
* 定時加載配置信息
* 配合reloadDataModel模塊一起使用。
* 主要實現原理如下:
* 1,獲取分鍾的數據值,當分鍾數據是10的倍數,就會觸發reloadDataModel方法,簡稱reload時間。
* 2,reloadDataModel方式是線程安全的,在當前worker中只有一個線程能夠操作。
* 3,為了保證當前線程操作完畢之后,其他線程不再重復操作,設置了一個標識符reloaded。
* 在非reload時間段時,reloaded一直被置為true;
* 在reload時間段時,第一個線程進入reloadDataModel后,加載完畢之后會將reloaded置為false。
*/
public static void scheduleLoad() {
// String date = DateUtils.getDateTime();
// int now = Integer.parseInt(date.split(":")[1]);
// if (now % 10 == 0) {//每10分鍾加載一次
// //1,2,3,4,5,6
// reloadDataModel();
// }else {
// reloaded = true;
// }
if (System.currentTimeMillis()==nextReload){
//thread 1,2,3,
reloadDataModel();
}