日志監控告警系統的設計與實現


  基於的日志進行監控,監控需要一定規則,對觸發監控規則的日志信息進行告警,告警的方式,是短信和郵件。
  
  log4j---->error,info,debug 應用程序程序的日志 error級別 TimeOutException 角標越界IndexXXXException ......Error
  
  com.alibaba.jstorm.daemon.worker.WorkerData]-[INFO] Current worker taskList:[1, 2, 3, 4, 5, 6, 7]
  
  String.contains.(" taskList ")-------------->當訂單量觸發一千萬時,告警通知,讓大家慶祝下。
  
  OrdertotalNum:1000萬
  
  kafaka生成集群的原理、分區
  
  kafka消費者的負載均衡,kfakaSpout
  
  Kafka broker(核心機制,topic,分片,文件存儲機制)
  
  Redis API學習
  
  spout:從外部數據源中讀取數據,然后轉換為topology
  
  架構圖:
  
  DataSource:外部數據源
  
  Spout:接收外部數據源的組件,將外部數據源轉化成storm內部的數據,以Tuple為基本的傳輸單元下發給Bolt.
  
  Bolt:接受Spout發送的數據,或上游的bolt的發送的數據,根據業務邏輯進行處理,發送給下一個Bolt或者是存儲到某種介質上,例如Redis。
  
  Tuple:Storm內部中數據傳輸的基本單元,里面封裝了一個List對象,用來保存數據。
  
  StreamGroup:數據分鍾策略,7種,shuffleGrouping,Non Grouping,FieldGrouping,Local or ShuffleGrouping.
  
  Nimbus:任務分配
  
  Supervisor:接受任務,並啟動worker,worker的數量是根據端口號來的。
  
  Worker:執行任務的具體組件(JVM),可以執行兩種類型的任務,Spout任務或者bolt任務
  
  Task:一個task屬於一個Spout或者Bolt並發任務。
  
  zk:保存任務分配的信息,心跳信息,元數據信息。
  
  1、背景知識
  
  一款優秀的軟件需要具備的特點
  
  l 軟件的實用性。
  
  所謂有的放矢,軟件的誕生是為了解決特定的問題,比如現在流行的MVC 框架,早期的沒有MVC 開發的時候,耦合度很大,后期維護更新成本高,難度大,這樣MVC 框架就孕育而生;比如陌陌這種社交軟件,是為了解決陌生人之間交流的問題;比如疼醒這種軟件是為了解決人們遠程溝通的問題;比如OA系統為了解決公司協同流程、項目管理、知識管理等問題……所以一款優秀的軟件必須能夠解決一個領域內的問題。
  
  l 軟件的穩定性。
  
  軟件的實用性問題解決之后,急需要解決的問題就是軟件的穩定性。一般線上系統都會承載企業的某項業務,系統的穩定性直接影響了業務是否能夠正常運營。很多創業公司在前期只注重業務的發展,不太在意系統的穩定性,一旦用戶兩比較大的之后,就會出現很多性能的問題。這種情況就好比,你找了一個妹子,並准備深入交往后結婚,卻發現這個妹子總是有很多異性朋友在聯系……
  
  l 代碼的規范性
  
  鐵打的營盤流水的兵,一款優秀的軟件不僅僅是功能的實現。整體架構、功能模塊、代碼注釋、擴展性等問題也也需要考慮,畢竟在一個軟件的生命周期過程中,參與的人實在是太多了,主創人員也可能隨時流式。所以代碼的規范性就難能可貴了。
  
  l 升級保持向前兼容性。
  
  如果一個軟件平常使用挺好的,但是升級卻越來越費勁,或者升級后穩定性大打折扣,也難以稱得上一個好的軟件。
  
  l 基本的使用手冊
  
  文檔、文檔、文檔、一個簡單有效的使用手冊,才是程序的王道,知其然才能知其所以然。能讓用戶一目了然,功能、架構、設計思路、代碼等等。
  
  2、需求分析
  
  隨着公司業務發展,支撐公司業務的各種系統越來越多,為了保證公司的業務正常發展,急需要對這些線上系統的運行進行監控,做到問題的及時發現和處理,最大程度減少對業務的影響。
  
  目前系統分類有:
  
  1) 有基於Tomcat的web應用
  
  2) 有獨立的Java Application應用
  
  3) 有運行在Linux上的腳本程序
  
  4) 有大規模的集群框架(zookeeper、Hadoop、Storm、SRP……)
  
  5) 有操作系統的運行日志
  
  主要功能需求分為:
  
  監控系統日志中的內容,按照一定規則進行過濾
  
  發現問題之后通過短信和郵件進行告警
  
  3、功能分析
  
  l 數據輸入
  
  使用flume客戶端獲取個系統的數據;
  
  用戶通過頁面輸入系統名稱、負責人觸發規則等信息
  
  l 數據存儲
  
  使用flume采集數據並存放在kafka集群中
  
  l 數據計算
  
  使用storm編寫程序對日志進行過濾,將滿足過濾規則的信息,通過郵件短信告警並保存到數據庫中
  
  l 數據展示
  
  管理頁面可以查看觸發規則的信息,系統負責人,聯系方式,觸發信息明細等
  
  4、原型設計
  
  產品經理設計產品原形
  
  5、架構設計
  
  5.1、整體架構設計
  
  主要架構為應用+flume+kafka+storm+MySQL+java web。數據流程如下:
  
  1. 應用程序使用log4j產生日志
  
  2. 部署flume客戶端監控應用程序產生的日志信息,並發送到kafka集群中
  
  3. storm spout拉去kafka的數據進行消費,逐條過濾每條日志的進行規則判斷,對符合規則的日志進行郵件告警。
  
  4. 最后將告警的信息保存到mysql數據庫中,用來進行管理。
  
  5.2、Flume設計
  
  l Flume說明
  
  Flume是一個分布式、可靠地、可用的服務,用來收集、聚合、傳輸日志數據。
  
  它是一個基於流式數據的架構,簡單而靈活。具有健壯性、容錯機制、故障轉移、恢復機制。
  
  它提供一個簡單的可擴展的數據模型,容許在線分析程序。F
  
  Flume 作為 cloudera 開發的實時日志收集系統,受到了業界的認可與廣泛應用。
  
  l Flume 設計摘要
  
  使用 Flume EXEC執行一個linux命令來生成數據源。例如,可以用tail命令監控一個文件,那么,只要文件增加內容,EXEC就可以將增加的內容作為數據源發送出去。
  
  使用 org.apache.flume.plugins.KafkaSink,將Flume EXEC產生的數據源發送到Kafka中。
  
  5.3、Kafka設計
  
  l Kafka說明
  
  kafka是一個分布式消息隊列:生產者、消費者的功能。
  
  l Kakfa設計摘要
  
  部署kafka集群,在集群中添加一個Topic:monitor_realtime_javaxy
  
  5.4、Storm設計
  
  l KafkaSpout讀取數據,需要配置Topic:monitor_realtime_javaxy
  
  l FilterBolt判斷規則
  
  l NotifyBolt用來發送郵件或短信息
  
  l Save2DB用來將告警信息寫入mysql數據庫
  
  5.5、 數據模型設計
  
  5.5.1、用戶表
  
  用來保存用戶的信息,包括賬號、手機號碼、郵箱、是否有效等信息
  
  5.5.2、應用表
  
  用來保存應用的信息,包括應用名稱、應用描述、應用是否在線等信息
  
  5.5.3、應用類型表
  
  用來保存應用的類型等信息
  
  5.5.4、規則表
  
  用來保存規則的信息,包括規則名稱,規則描述,規則關鍵詞等信息
  
  5.5.5、規則記錄表
  
  用來保存觸發規則后的記錄,包括告警編號、是否短信告知、是否郵件告知、告警明細等信息。
  
  6、 代碼開發
  
  6.1、 整體結構
  
  6.2、 LogMonitorTopologyMain驅動類
  
  public class LogMonitorTopologyMain {
  
  private static Logger logger = Logger.getLogger(LogMonitorTopologyMain.class);
  
  public static void main(String[] args) throws Exception{
  
  // 使用TopologyBuilder進行構建驅動類
  
  TopologyBuilder builder = new TopologyBuilder();
  
  // 設置kafka的zookeeper集群
  
  // BrokerHosts hosts = new ZkHosts("zk01:2181,zk02:2181,zk03:2181");
  
  //// // 初始化配置信息
  
  // SpoutConfig spoutConfig = new SpoutConfig(hosts, "logmonitor", "/aaa", "log_monitor");
  
  // 在topology中設置spout
  
  // builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig),3);
  
  builder.setSpout("kafka-spout",new RandomSpout(new StringScheme()),2);
  
  builder.setBolt("filter-bolt",new FilterBolt(),3).shuffleGrouping("kafka-spout");
  
  builder.setBolt("prepareRecord-bolt",new PrepareRecordBolt(),2).fieldsGrouping("filter-bolt", new Fields("appId"));
  
  builder.setBolt("saveMessage-bolt",new SaveMessage2MySql(),2).shuffleGrouping("prepareRecord-bolt");
  
  //啟動topology的配置信息
  
  Config topologConf = new Config();
  
  //TOPOLOGY_DEBUG(setDebug), 當它被設置成true的話, storm會記錄下每個組件所發射的每條消息。
  
  //這在本地環境調試topology很有用, 但是在線上這么做的話會影響性能的。
  
  topologConf.setDebug(true);
  
  //storm的運行有兩種模式: 本地模式和分布式模式.
  
  if (args != null && args.length > 0) {
  
  //定義你希望集群分配多少個工作進程給你來執行這個topology
  
  topologConf.setNumWorkers(2);
  
  //向集群提交topology
  
  StormSubmitter.submitTopologyWithProgressBar(args[0], topologConf, builder.createTopology());
  
  } else {
  
  topologConf.setMaxTaskParallelism(3);
  
  LocalCluster cluster = new LocalCluster(www.hbwfjx.cn/);
  
  cluster.submitTopology("word-count", topologConf, builder.createTopology());
  
  Utils.sleep(10000000);
  
  cluster.shutdown();
  
  }
  
  }
  
  }
  
  6.3、FilterBolt用來過濾日志信息
  
  主要是過濾格式和校驗appId是否合法。
  
  public void execute(Tuple input, BasicOutputCollector collector) {
  
  //獲取KafkaSpout發送出來的數據
  
  String line = input.getString(0);
  
  //獲取kafka發送的數據,是一個byte數組
  
  // byte[] value = (byte[]www.sb45475.com) input.getValue(0);
  
  //將數組轉化成字符串
  
  // String line = new String(value);
  
  //對數據進行解析
  
  // appid content
  
  //1 error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao
  
  Message message = MonitorHandler.parser(http://www.yuheng119.com/ line);
  
  if (message == null) {
  
  return;
  
  }
  
  if (MonitorHandler.trigger(message)) {
  
  collector.emit(new Values(message.getAppId(), message));
  
  }
  
  //定時更新規則信息
  
  MonitorHandler.scheduleLoad();
  
  }
  
  6.4、PrepareRecordBolt發送郵件告警和短信告警
  
  public void execute(Tuple input, BasicOutputCollector collector) {
  
  Message message = (Message) input.getValueByField("www.yigouylpt2.com message");
  
  String appId = input.getStringByField("appId");
  
  //將觸發規則的信息進行通知
  
  MonitorHandler.notifly(appId, message);
  
  Record record = new Record();
  
  try {
  
  BeanUtils.copyProperties(record, message);
  
  collector.emit(new Values(record));
  
  } catch (Exception e) {
  
  }
  
  }
  
  6.6、 SaveMessage2MySq保存到數據庫
  
  public class SaveMessage2MySql extends BaseBasicBolt {
  
  private static Logger logger = Logger.getLogger(SaveMessage2MySql.class);
  
  public void execute(Tuple input, BasicOutputCollector collector) {
  
  Record record = (Record) input.getValueByField("record");
  
  MonitorHandler.save(record);
  
  }
  
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  
  }
  
  }
  
  6.7、核心類 MonitorHandler 所有流程處理的核心代碼
  
  public class MonitorHandler {
  
  private static Logger logger = Logger.getLogger(MonitorHandler.class);
  
  //定義一個map,其中appId為Key,以該appId下的所有rule為Value
  
  private static Map<String, List<Rule>> ruleMap;
  
  //定義一個map,其中appId為Key,以該appId下的所有user為Value
  
  private static Map<String, List<User>> userMap;
  
  //定義一個list,用來封裝所有的應用信息
  
  private static List<App> applist;
  
  //定義一個list,用來封裝所有的用戶信息
  
  private static List<User> userList;
  
  //定時加載配置文件的標識
  
  private static boolean reloaded = false;
  
  //定時加載配置文件的標識
  
  private static long nextReload = 0l;
  
  static {
  
  load();
  
  }
  
  /**
  
  * 解析輸入的日志,將數據按照一定的規則進行分割。
  
  * 判斷日志是否合法,主要校驗日志所屬應用的appId是否存在
  
  *
  
  * @param line 一條日志
  
  * @return
  
  */
  
  public static Message parser(String line) {
  
  //日志內容分為兩個部分:由5個$$$$$符號作為分隔符,第一部分為appid,第二部分為日志內容。
  
  String[] messageArr = line.split("\\$\\$\\$\\$\\$");
  
  //對日志進行校驗
  
  if (messageArr.length != 2) {
  
  return null;
  
  }
  
  if (StringUtils.isBlank(messageArr[0]) || StringUtils.isBlank(messageArr[1])) {
  
  return null;
  
  }
  
  //檢驗當前日志所屬的appid是否是經過授權的。
  
  if (apppIdisValid(messageArr[0].trim())) {
  
  Message message = new Message();
  
  message.setAppId(messageArr[0].trim());
  
  message.setLine(messageArr[1]);
  
  return message;
  
  }
  
  return null;
  
  }
  
  /**
  
  * 驗證appid是否經過授權
  
  */
  
  private static boolean apppIdisValid(String appId) {
  
  try {
  
  for (App app : applist) {
  
  if (app.getId() == Integer.parseInt(appId)) {
  
  return true;
  
  }
  
  }
  
  } catch (Exception e) {
  
  return false;
  
  }
  
  return false;
  
  }
  
  /**
  
  * 對日志進行規制判定,看看是否觸發規則
  
  * @param message
  
  * @return
  
  */
  
  public static boolean trigger(Message message) {
  
  //如果規則模型為空,需要初始化加載規則模型
  
  if (ruleMap == null) {
  
  load();
  
  }
  
  //從規則模型中獲取當前appid配置的規則
  
  System.out.println(message.getAppId());
  
  List<Rule> keywordByAppIdList = ruleMap.get(message.getAppId());
  
  for (Rule rule : keywordByAppIdList) {
  
  //如果日志中包含過濾過的關鍵詞,即為匹配成功
  
  if (message.getLine().contains(rule.getKeyword())) {
  
  message.setRuleId(rule.getId() + "");
  
  message.setKeyword(rule.getKeyword());
  
  return true;
  
  }
  
  }
  
  return false;
  
  }
  
  /**
  
  * 加載數據模型,主要是用戶列表、應用管理表、組合規則模型、組合用戶模型。
  
  */
  
  public static synchronized void load() {
  
  if (userList == null) {
  
  userList = loadUserList();
  
  }
  
  if (applist == null) {
  
  applist = loadAppList();
  
  }
  
  if (ruleMap == null) {
  
  ruleMap = loadRuleMap();
  
  }
  
  if (userMap == null) {
  
  userMap = loadUserMap();
  
  }
  
  }
  
  /**
  
  * 訪問數據庫獲取所有有效的app列表
  
  * @return
  
  */
  
  private static List<App> loadAppList() {
  
  return new LogMonitorDao().getAppList();
  
  }
  
  /**
  
  * 訪問數據庫獲取所有有效用戶的列表
  
  * @return
  
  */
  
  private static List<User> loadUserList() {
  
  return new LogMonitorDao().getUserList();
  
  }
  
  /**
  
  * 封裝應用與用戶對應的map
  
  * @return
  
  */
  
  private static Map<String, List<User>> loadUserMap() {
  
  //以應用的appId為key,以應用的所有負責人的userList對象為value。
  
  //HashMap<String, List<User>>
  
  HashMap<String, List<User>> map = new HashMap<String, List<User>>();
  
  for (App app : applist) {
  
  String userIds = app.getUserId();
  
  List<User> userListInApp = map.get(app.getId());
  
  if (userListInApp == null) {
  
  userListInApp = new ArrayList<User>();
  
  map.put(app.getId() + "", userListInApp);
  
  }
  
  String[] userIdArr = userIds.split(",");
  
  for (String userId : userIdArr) {
  
  userListInApp.add(queryUserById(userId));
  
  }
  
  map.put(app.getId() + "", userListInApp);
  
  }
  
  return map;
  
  }
  
  /**
  
  * 封裝應用與規則的map
  
  * @return
  
  */
  
  private static Map<String, List<Rule>> loadRuleMap() {
  
  Map<String, List<Rule>> map = new HashMap<String, List<Rule>>();
  
  LogMonitorDao logMonitorDao = new LogMonitorDao();
  
  List<Rule> ruleList = logMonitorDao.getRuleList();
  
  //將代表rule的list轉化成一個map,轉化的邏輯是,
  
  // 從rule.getAppId作為map的key,然后將rule對象作為value傳入map
  
  //Map<appId,ruleList> 一個appid的規則信息,保存在一個list中。
  
  for (Rule rule : ruleList) {
  
  List<Rule> ruleListByAppId = map.get(rule.getAppId()+"");
  
  if (ruleListByAppId == null) {
  
  ruleListByAppId = new ArrayList<Rule>();
  
  map.put(rule.getAppId() + "", ruleListByAppId);
  
  }
  
  ruleListByAppId.add(rule);
  
  map.put(rule.getAppId() + "", ruleListByAppId);
  
  }
  
  return map;
  
  }
  
  /**
  
  * 通過用戶編號獲取用戶的JavaBean
  
  * @param userId
  
  * @return
  
  */
  
  private static User queryUserById(String userId) {
  
  for (User user : userList) {
  
  if (user.getId() == Integer.parseInt(userId)) {
  
  return user;
  
  }
  
  }
  
  return null;
  
  }
  
  /**
  
  * 通過app編號,獲取當前app的所有負責人列表
  
  * @param appId
  
  * @return
  
  */
  
  public static List<User> getUserIdsByAppId(String appId) {
  
  return userMap.get(appId);
  
  }
  
  /**
  
  * 告警模塊,用來發送郵件和短信
  
  * 短信功能由於短信資源匱乏,目前默認返回已發送。
  
  * @param appId
  
  * @param message
  
  */
  
  public static void notifly(String appId, Message message) {
  
  //通過appId獲取應用負責人的對象
  
  List<User> users = getUserIdsByAppId(appId);
  
  //發送郵件
  
  if (sendMail(appId, users, message)) {
  
  message.setIsEmail(1);
  
  }
  
  //發送短信
  
  if (sendSMS(appId, users, message)) {
  
  message.setIsPhone(1);
  
  }
  
  }
  
  /**
  
  * 發送短信的模塊
  
  * 由於短信資源匱乏,目前該功能不開啟,默認true,即短信發送成功。
  
  * 目前發送短信功能使用的是外部接口,外面接口的並發性沒法保證,會影響storm程序運行的效率。
  
  * 后期可以改造為將短信數據發送到外部的消息隊里中,然后創建一個worker去發送短信。
  
  * @param appId
  
  * @param users
  
  * @param message
  
  * @return
  
  */
  
  private static boolean sendSMS(String appId, List<User> users, Message message) {
  
  // return true;
  
  List<String> mobileList = new ArrayList<String>();
  
  for (User user : users) {
  
  mobileList.add(user.getMobile());
  
  }
  
  for (App app : applist) {
  
  if (app.getId() == Integer.parseInt(appId.trim())) {
  
  message.setAppName(app.getName());
  
  break;
  
  }
  
  }
  
  String content = "系統【" + message.getAppName() + "】在 " + DateUtils.getDateTime() + " 觸發規則 " + message.getRuleId() + ",關鍵字:" + message.getKeyword();
  
  return SMSBase.sendSms(listToStringFormat(mobileList), content);
  
  }
  
  /**
  
  * 發送郵件
  
  * 后期可以改造為將郵件數據發送到外部的消息隊里中,然后創建一個worker去發送短信。
  
  * @param appId
  
  * @param userList
  
  * @param message
  
  * @return
  
  */
  
  private static boolean sendMail(String appId, List<User> userList, Message message) {
  
  List<String> receiver = new ArrayList<String>();
  
  for (User user : userList) {
  
  receiver.add(user.getEmail());
  
  }
  
  for (App app : applist) {
  
  if (app.getId() == Integer.parseInt(appId.trim())) {
  
  message.setAppName(app.getName());
  
  break;
  
  }
  
  }
  
  if (receiver.size() >= 1) {
  
  String date = DateUtils.getDateTime();
  
  String content = "系統【" + message.getAppName() + "】在 " + date + " 觸發規則 " + message.getRuleId() + " ,過濾關鍵字為:" + message.getKeyword() + " 錯誤內容:" + message.getLine();
  
  MailInfo mailInfo = new MailInfo("系統運行日志監控", content, receiver, null);
  
  return MessageSender.sendMail(mailInfo);
  
  }
  
  return false;
  
  }
  
  /**
  
  * 保存觸發規則的信息,將觸發信息寫入到mysql數據庫中。
  
  *
  
  * @param record
  
  */
  
  public static void save(Record record) {
  
  new LogMonitorDao().saveRecord(record);
  
  }
  
  /**
  
  * 將list轉換為String
  
  * @param list
  
  * @return
  
  */
  
  private static String listToStringFormat(List<String> list) {
  
  StringBuilder stringBuilder = new StringBuilder();
  
  for (int i = 0; i < list.size(); i++) {
  
  if (i == list.size() - 1) {
  
  stringBuilder.append(list.get(i));
  
  } else {
  
  stringBuilder.append(list.get(i)).append(",");
  
  }
  
  }
  
  return stringBuilder.toString();
  
  }
  
  /**
  
  * 配置scheduleLoad重新加載底層數據模型。
  
  */
  
  /**
  
  * thread 4
  
  * thread 3
  
  * thread 2
  
  */
  
  public static synchronized void reloadDataModel() {
  
  // * thread 1 reloaded = true ----> reloaded = false
  
  // * thread 2 reloaded = false
  
  // * thread 2 reloaded = false
  
  // * thread 2 reloaded = false
  
  if (reloaded) {
  
  long start = System.currentTimeMillis();
  
  userList = loadUserList();
  
  applist = loadAppList();
  
  ruleMap = loadRuleMap();
  
  userMap = loadUserMap();
  
  reloaded = false;
  
  nextReload = 0l;
  
  logger.info("配置文件reload完成,時間:"+DateUtils.getDateTime()+" 耗時:"+ (System.currentTimeMillis()-start));
  
  }
  
  }
  
  /**
  
  * 定時加載配置信息
  
  * 配合reloadDataModel模塊一起使用。
  
  * 主要實現原理如下:
  
  * 1,獲取分鍾的數據值,當分鍾數據是10的倍數,就會觸發reloadDataModel方法,簡稱reload時間。
  
  * 2,reloadDataModel方式是線程安全的,在當前worker中只有一個線程能夠操作。
  
  * 3,為了保證當前線程操作完畢之后,其他線程不再重復操作,設置了一個標識符reloaded。
  
  * 在非reload時間段時,reloaded一直被置為true;
  
  * 在reload時間段時,第一個線程進入reloadDataModel后,加載完畢之后會將reloaded置為false。
  
  */
  
  public static void scheduleLoad() {
  
  // String date = DateUtils.getDateTime();
  
  // int now = Integer.parseInt(date.split(":")[1]);
  
  // if (now % 10 == 0) {//每10分鍾加載一次
  
  // //1,2,3,4,5,6
  
  // reloadDataModel();
  
  // }else {
  
  // reloaded = true;
  
  // }
  
  if (System.currentTimeMillis()==nextReload){
  
  //thread 1,2,3,
  
  reloadDataModel();
  
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM