Flume 自定義攔截器 多行讀取日志+截斷


前言:

  Flume百度定義如下:

       Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。

搭建並使用flume不是特別難,而且網上也有技術文章分享,我不再贅述了。本文主要建立在已經搭建並使用flume的情況。

 

業務場景:

flume讀取日志是按行讀取,無法進行多行讀取,當出現如下日志時將無法讀到日志的正確時間與類型信息,所以我們需要有一種可以多行讀取日志信息的辦法,這里采用自定義攔截器的方法:

1 2019-08-02 14:34:13.153 [DEBUG][tomcatThreadPool-7][com.xxx.xxx.xxx.xxx.web.CommonHandlerExceptionResolver] (CommonHandlerExceptionResolver.java:134) \n Exception:------------------------------------------------------------------\ncom.xxx.xxx.xx.exceptions.XxxException: \n### Error querying database.  Cause: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (5638500 > 4194304). You can change this value on the server by setting the max_allowed_packet' variable.\n### The error may exist in com/xxx/xxx/xxx/basic/mapper/custom/CsWmFrtRateExtMapper.xml\n### The error may involve defaultParameterMap\n### The error occurred while setting parameters\n### SQL: SELECT * FROM ((SELECT cwfr.CS_WM_FRT_RATE_ID AS CS_WM_FRT_RATE_ID, cwfr.ACTIVE_DATE_BEGIN AS ACTIVE_DATE_BEGIN, cwfr.FRT_ITEM_CODE AS FRT_ITEM_CODE, cwfr.FRT_ITEM_NAME AS FRT_ITEM_NAME, cwfr.RP_FLAG AS RP_FLAG, cwfr.FRT_MODE AS FRT_MODE, cwfr.CALCULATION_ITEM AS CALCULATION_ITEM, cwfr.CHARGE_UOM_CODE AS CHARGE_UOM_CODE, cwfr.CHARGE_UOM_NAME AS 

  

對於一些業務系統的日志可能會比較大,超1M,2M甚至更多,可以根據實際情況只截取前面一部分保留下來即可,為了讓功能更具有靈活性,在實現上增加開關屬性,默認打開着,不需要時設置關閉。

自定義攔截器實現的屬性:過濾正則,截斷標識(即開關),總截取最大長度,單個截取最大長度,最后一個事件流。最后一個事件流的作用保留下來與下一批次一起,按正則匹配后才發送出去,因為flume是按批次讀取的,默認是100行,而這個配置又與flume運行內存有關系。這個是屬於參數調優的話題。

 

特別注意:代碼打包后是需要放到flume安裝目錄下的lib下。放進去后需要重新才會生效。

 

代碼實現如下:

  1 package org.apache.flume.custom;
  2 
  3 import com.google.common.collect.Lists;
  4 import org.apache.commons.codec.Charsets;
  5 import org.apache.flume.Context;
  6 import org.apache.flume.Event;
  7 import org.apache.flume.interceptor.Interceptor;
  8 
  9 import java.util.List;
 10 import java.util.regex.Matcher;
 11 import java.util.regex.Pattern;
 12 
 13 /**
 14  * 自定義攔截器 參考 Author: xiufen.huang Create Data: 2019/8/12 15:46
 15  */
 16 public class MultInterceptor implements Interceptor {
 17 
 18     // 過濾正則
 19     private static Pattern regex = null;
 20     // 截取標志
 21     private static Boolean cutFlag = true;
 22     // 總截取最大長度
 23     private static Integer cutMax = null;
 24     // 單個截取最大長度
 25     private static Integer singleCut = null;
 26     // 最后一個事件流
 27     private static List<Event> lastList = Lists.newArrayList();
 28 
 29     @Override
 30     public void initialize() {
 31 
 32     }
 33 
 34     @Override
 35     public Event intercept(Event event) {
 36 //        System.out.println("----------intercept(Event event)方法執行,處理單個event");  
 37         return event;
 38     }
 39 
 40     @Override
 41     public List<Event> intercept(List<Event> list) {
 42 //        System.out.println("進來方法了嗎?");
 43 
 44         // 處理結果 event list
 45         List<Event> intercepted = null;
 46 
 47         int addnum = 0;// 記錄上一個正確匹配的event在隊列中的位置,以便下一event有和它連接的需要
 48 
 49         if (lastList != null && lastList.size() >0){
 50             // 初始化
 51             int initCapacity = list.size() + lastList.size();
 52             intercepted = Lists.newArrayListWithCapacity(initCapacity);
 53             // 添加
 54             intercepted.addAll(lastList);
 55 
 56             // 清空
 57             lastList = Lists.newArrayList();
 58         }else {
 59             intercepted = Lists.newArrayListWithCapacity(list.size());
 60         }
 61 
 62         // 有正則的情況
 63         for (int i = 0; i < list.size(); i++) {
 64             Event interceptedEvent  = null;
 65             Matcher matcher = regex.matcher(new String(list.get(i).getBody(), Charsets.UTF_8));
 66             if (matcher.find()) {
 67                 interceptedEvent = intercept((Event)list.get(i));
 68                 // 單個的body
 69                 String singleBody = new String(interceptedEvent.getBody(), Charsets.UTF_8);
 70                 int singleBodyLen = singleBody.length();
 71                 System.out.println("正則匹配-原始body---------:" + singleBody);
 72                 if (cutFlag) {
 73                     // 處理最大截取數邊界條件--一定要重新一個變量接收
 74                     int lsSingleCut = singleCut > singleBodyLen ? singleBodyLen : singleCut;
 75                     // 截取字符串--新變量
 76                     String singleCutBody = new String(singleBody.substring(0, lsSingleCut));
 77 
 78                     System.out.println("單個截取-截取后body=============:" + singleCutBody);
 79                     // 重新賦值body
 80                     interceptedEvent.setBody(singleCutBody.getBytes());
 81                 }
 82 
 83                 intercepted.add(interceptedEvent);
 84                 addnum = addnum +1;
 85 //                System.out.println("matcher.find() 下的:addnum:" + addnum);
 86             } else {
 87                 if (intercepted.size() == 0) {
 88                     // 表示本次沒有匹配上
 89                     continue;
 90                 }
 91 
 92                  addnum = addnum >= intercepted.size() ? intercepted.size() - 1 : addnum;
 93 
 94 
 95                 String body = new String(intercepted.get(addnum).getBody(), Charsets.UTF_8) + "\n"
 96                         + new String(list.get(i).getBody(), Charsets.UTF_8);
 97 
 98                 System.out.println("總截取-原始body---------:" + body);
 99                 int bodyLen = body.length();
100                 // 截取body-新變量
101                 String cutBody = body;
102                 if (cutFlag) {
103 
104                     // 處理最大截取數邊界條件--新變量
105                     int lsCutMax = cutMax > bodyLen ? bodyLen : cutMax;
106                     // 截取字符串
107                     cutBody = new String(body.substring(0, lsCutMax));
108                     System.out.println("-處理截取-截取后body=============: " + body);
109                 }
110 
111                 intercepted.get(addnum).setBody(cutBody.getBytes());
112             }
113         }
114 
115         // 最后一個保存在靜態變量,等待下一批次
116         if (intercepted != null && intercepted.size() > 0){
117             int lastIndex = intercepted.size() -1;
118             lastList.add(intercepted.get(lastIndex));
119             // 移除最后一個索引
120             intercepted.remove(lastIndex);
121         }
122 
123         return intercepted;
124     }
125 
126     @Override
127     public void close() {
128         System.out.println("----------自定義攔截器close方法執行");
129     }
130 
131     public static class Builder implements Interceptor.Builder {
132         @Override
133         public Interceptor build() {
134             System.out.println("----------build方法執行");
135             return new MultInterceptor();
136         }
137 
138         @Override
139         public void configure(Context context) {
140             String regexStr = context.getString("regex", null);
141             cutFlag = context.getBoolean("cutFlag", true);
142             cutMax = context.getInteger("cutMax", 0);
143             singleCut = context.getInteger("singleCut", 0);
144             System.out.println("參數regexStr:" + regexStr + ",參數cutMax: " + cutMax + ",cutFlag: " + cutFlag
145                     + " ,singleCut: " + singleCut);
146 
147             // 由於外面傳過來的單位是kb,所以這邊需要乘以1024
148             cutMax = cutMax * 1024;
149             System.out.println("總截取最大值:" + cutMax);
150             singleCut = singleCut * 1024;
151             System.out.println("單個截取最大值:" + singleCut);
152 
153             if (null != regexStr) {
154                 // 轉換正則
155                 regex = Pattern.compile(regexStr);
156             }
157 
158         }
159     }
160 }
View Code

 

使用說明:

在flume啟動配置文件增加以下內容:

#匹配時間並轉換為時間戳到header中
a1.sources.tail.interceptors.i2.type=org.apache.flume.custom.MultInterceptor$Builder
#正則表達式,按需求定
a1.sources.tail.interceptors.i2.regex=(((?!0000)[0-9]{4}-((0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-8])|(0[13-9]|1[0-2])-(29|30)|(0[13578]|1[02])-31)|([0-9]{2}(0[48]|[2468][048]|[13579][26])|(0[48]|[2468][048]|[13579][26])00)-02-29))
#開啟日志長度截取標志,默認true,開啟
a1.sources.tail.interceptors.i2.cutFlag = true
#最大截取字符串長度,整數,盡量控制在2M以內,單位:kb,1M=1024
a1.sources.tail.interceptors.i2.cutMax = 2048
#單個截取字符串長度,整數,盡量控制在1.5M以內,單位:kb,1M=1024
a1.sources.tail.interceptors.i2.singleCut=1024
a1.sources.tail.interceptors.i2.serializers=se1
a1.sources.tail.interceptors.i2.serializers.se1.type=org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.tail.interceptors.i2.serializers.se1.name=timestamp
a1.sources.tail.interceptors.i2.serializers.se1.pattern=yyyy-MM-dd

 

 

參考實現:

flume 自定義攔截器實現多行讀取日志 https://blog.csdn.net/nougats/article/details/71188920

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM