前言:
Flume百度定義如下:
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。
搭建並使用flume不是特別難,而且網上也有技術文章分享,我不再贅述了。本文主要建立在已經搭建並使用flume的情況。
業務場景:
flume讀取日志是按行讀取,無法進行多行讀取,當出現如下日志時將無法讀到日志的正確時間與類型信息,所以我們需要有一種可以多行讀取日志信息的辦法,這里采用自定義攔截器的方法:
1 2019-08-02 14:34:13.153 [DEBUG][tomcatThreadPool-7][com.xxx.xxx.xxx.xxx.web.CommonHandlerExceptionResolver] (CommonHandlerExceptionResolver.java:134) \n Exception:------------------------------------------------------------------\ncom.xxx.xxx.xx.exceptions.XxxException: \n### Error querying database. Cause: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (5638500 > 4194304). You can change this value on the server by setting the max_allowed_packet' variable.\n### The error may exist in com/xxx/xxx/xxx/basic/mapper/custom/CsWmFrtRateExtMapper.xml\n### The error may involve defaultParameterMap\n### The error occurred while setting parameters\n### SQL: SELECT * FROM ((SELECT cwfr.CS_WM_FRT_RATE_ID AS CS_WM_FRT_RATE_ID, cwfr.ACTIVE_DATE_BEGIN AS ACTIVE_DATE_BEGIN, cwfr.FRT_ITEM_CODE AS FRT_ITEM_CODE, cwfr.FRT_ITEM_NAME AS FRT_ITEM_NAME, cwfr.RP_FLAG AS RP_FLAG, cwfr.FRT_MODE AS FRT_MODE, cwfr.CALCULATION_ITEM AS CALCULATION_ITEM, cwfr.CHARGE_UOM_CODE AS CHARGE_UOM_CODE, cwfr.CHARGE_UOM_NAME AS
對於一些業務系統的日志可能會比較大,超1M,2M甚至更多,可以根據實際情況只截取前面一部分保留下來即可,為了讓功能更具有靈活性,在實現上增加開關屬性,默認打開着,不需要時設置關閉。
自定義攔截器實現的屬性:過濾正則,截斷標識(即開關),總截取最大長度,單個截取最大長度,最后一個事件流。最后一個事件流的作用保留下來與下一批次一起,按正則匹配后才發送出去,因為flume是按批次讀取的,默認是100行,而這個配置又與flume運行內存有關系。這個是屬於參數調優的話題。
特別注意:代碼打包后是需要放到flume安裝目錄下的lib下。放進去后需要重新才會生效。
代碼實現如下:

1 package org.apache.flume.custom; 2 3 import com.google.common.collect.Lists; 4 import org.apache.commons.codec.Charsets; 5 import org.apache.flume.Context; 6 import org.apache.flume.Event; 7 import org.apache.flume.interceptor.Interceptor; 8 9 import java.util.List; 10 import java.util.regex.Matcher; 11 import java.util.regex.Pattern; 12 13 /** 14 * 自定義攔截器 參考 Author: xiufen.huang Create Data: 2019/8/12 15:46 15 */ 16 public class MultInterceptor implements Interceptor { 17 18 // 過濾正則 19 private static Pattern regex = null; 20 // 截取標志 21 private static Boolean cutFlag = true; 22 // 總截取最大長度 23 private static Integer cutMax = null; 24 // 單個截取最大長度 25 private static Integer singleCut = null; 26 // 最后一個事件流 27 private static List<Event> lastList = Lists.newArrayList(); 28 29 @Override 30 public void initialize() { 31 32 } 33 34 @Override 35 public Event intercept(Event event) { 36 // System.out.println("----------intercept(Event event)方法執行,處理單個event"); 37 return event; 38 } 39 40 @Override 41 public List<Event> intercept(List<Event> list) { 42 // System.out.println("進來方法了嗎?"); 43 44 // 處理結果 event list 45 List<Event> intercepted = null; 46 47 int addnum = 0;// 記錄上一個正確匹配的event在隊列中的位置,以便下一event有和它連接的需要 48 49 if (lastList != null && lastList.size() >0){ 50 // 初始化 51 int initCapacity = list.size() + lastList.size(); 52 intercepted = Lists.newArrayListWithCapacity(initCapacity); 53 // 添加 54 intercepted.addAll(lastList); 55 56 // 清空 57 lastList = Lists.newArrayList(); 58 }else { 59 intercepted = Lists.newArrayListWithCapacity(list.size()); 60 } 61 62 // 有正則的情況 63 for (int i = 0; i < list.size(); i++) { 64 Event interceptedEvent = null; 65 Matcher matcher = regex.matcher(new String(list.get(i).getBody(), Charsets.UTF_8)); 66 if (matcher.find()) { 67 interceptedEvent = intercept((Event)list.get(i)); 68 // 單個的body 69 String singleBody = new String(interceptedEvent.getBody(), Charsets.UTF_8); 70 int singleBodyLen = singleBody.length(); 71 System.out.println("正則匹配-原始body---------:" + singleBody); 72 if (cutFlag) { 73 // 處理最大截取數邊界條件--一定要重新一個變量接收 74 int lsSingleCut = singleCut > singleBodyLen ? singleBodyLen : singleCut; 75 // 截取字符串--新變量 76 String singleCutBody = new String(singleBody.substring(0, lsSingleCut)); 77 78 System.out.println("單個截取-截取后body=============:" + singleCutBody); 79 // 重新賦值body 80 interceptedEvent.setBody(singleCutBody.getBytes()); 81 } 82 83 intercepted.add(interceptedEvent); 84 addnum = addnum +1; 85 // System.out.println("matcher.find() 下的:addnum:" + addnum); 86 } else { 87 if (intercepted.size() == 0) { 88 // 表示本次沒有匹配上 89 continue; 90 } 91 92 addnum = addnum >= intercepted.size() ? intercepted.size() - 1 : addnum; 93 94 95 String body = new String(intercepted.get(addnum).getBody(), Charsets.UTF_8) + "\n" 96 + new String(list.get(i).getBody(), Charsets.UTF_8); 97 98 System.out.println("總截取-原始body---------:" + body); 99 int bodyLen = body.length(); 100 // 截取body-新變量 101 String cutBody = body; 102 if (cutFlag) { 103 104 // 處理最大截取數邊界條件--新變量 105 int lsCutMax = cutMax > bodyLen ? bodyLen : cutMax; 106 // 截取字符串 107 cutBody = new String(body.substring(0, lsCutMax)); 108 System.out.println("-處理截取-截取后body=============: " + body); 109 } 110 111 intercepted.get(addnum).setBody(cutBody.getBytes()); 112 } 113 } 114 115 // 最后一個保存在靜態變量,等待下一批次 116 if (intercepted != null && intercepted.size() > 0){ 117 int lastIndex = intercepted.size() -1; 118 lastList.add(intercepted.get(lastIndex)); 119 // 移除最后一個索引 120 intercepted.remove(lastIndex); 121 } 122 123 return intercepted; 124 } 125 126 @Override 127 public void close() { 128 System.out.println("----------自定義攔截器close方法執行"); 129 } 130 131 public static class Builder implements Interceptor.Builder { 132 @Override 133 public Interceptor build() { 134 System.out.println("----------build方法執行"); 135 return new MultInterceptor(); 136 } 137 138 @Override 139 public void configure(Context context) { 140 String regexStr = context.getString("regex", null); 141 cutFlag = context.getBoolean("cutFlag", true); 142 cutMax = context.getInteger("cutMax", 0); 143 singleCut = context.getInteger("singleCut", 0); 144 System.out.println("參數regexStr:" + regexStr + ",參數cutMax: " + cutMax + ",cutFlag: " + cutFlag 145 + " ,singleCut: " + singleCut); 146 147 // 由於外面傳過來的單位是kb,所以這邊需要乘以1024 148 cutMax = cutMax * 1024; 149 System.out.println("總截取最大值:" + cutMax); 150 singleCut = singleCut * 1024; 151 System.out.println("單個截取最大值:" + singleCut); 152 153 if (null != regexStr) { 154 // 轉換正則 155 regex = Pattern.compile(regexStr); 156 } 157 158 } 159 } 160 }
使用說明:
在flume啟動配置文件增加以下內容:
#匹配時間並轉換為時間戳到header中 a1.sources.tail.interceptors.i2.type=org.apache.flume.custom.MultInterceptor$Builder #正則表達式,按需求定 a1.sources.tail.interceptors.i2.regex=(((?!0000)[0-9]{4}-((0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-8])|(0[13-9]|1[0-2])-(29|30)|(0[13578]|1[02])-31)|([0-9]{2}(0[48]|[2468][048]|[13579][26])|(0[48]|[2468][048]|[13579][26])00)-02-29)) #開啟日志長度截取標志,默認true,開啟 a1.sources.tail.interceptors.i2.cutFlag = true #最大截取字符串長度,整數,盡量控制在2M以內,單位:kb,1M=1024 a1.sources.tail.interceptors.i2.cutMax = 2048 #單個截取字符串長度,整數,盡量控制在1.5M以內,單位:kb,1M=1024 a1.sources.tail.interceptors.i2.singleCut=1024 a1.sources.tail.interceptors.i2.serializers=se1 a1.sources.tail.interceptors.i2.serializers.se1.type=org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.tail.interceptors.i2.serializers.se1.name=timestamp a1.sources.tail.interceptors.i2.serializers.se1.pattern=yyyy-MM-dd
參考實現: