ELK+Kafka+Beats實現海量日志收集平台(二)
三、環境搭建
通過上一小節應用場景和實現原理的介紹,接下來實現所需環境搭建及說明
架構圖如下所示:
環境說明:
192.168.232.6 : 部署了demo項目(用於產生數據日志)
filebeat-6.6.0
192.168.232.3 : Kafka (單體)(Zookeeper:192.168.232.3~5)
192.168.232.4 : Kibana
192.168.232.7 : Logstash (單體)
ES集群:192.168.232.8~10
1、filebeat安裝配置參考https://www.cnblogs.com/jhtian/p/13731230.html
2、Kafka安裝配置參考https://www.cnblogs.com/jhtian/p/13708679.html
3、Logstash安裝配置參考https://www.cnblogs.com/jhtian/p/13744753.html
4、ES集群搭建可參考https://www.cnblogs.com/jhtian/p/12703651.html
5、Kibana安裝可參考https://www.cnblogs.com/jhtian/p/13785029.html
四、部署demo工程項目:
項目結構圖如下,分別調用項目的 /index、/error兩個方法分別打印正常、錯誤日志
(warn及以上級別日志)到logs文件夾中,作為filebeat讀取數據的來源。
web訪問類文件:indexAction.java

1 package com.tianjh.demo.web; 2 3 import com.tianjh.demo.util.SetMDC; 4 import lombok.extern.slf4j.Slf4j; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.RestController; 7 8 @Slf4j 9 @RestController 10 public class indexAction { 11 12 @RequestMapping(value = "/index") 13 public String index() { 14 SetMDC.putMDC(); 15 log.info("這是一條模擬error日志打印"); 16 log.info("這是一條模擬warn日志打印"); 17 log.info("這是一條模擬info日志打印"); 18 return "hello word"; 19 } 20 21 @RequestMapping(value = "/err") 22 public String error() { 23 SetMDC.putMDC(); 24 try { 25 int a = 5/0; 26 } catch (Exception e) { 27 log.error("算術異常", e); 28 } 29 return "error"; 30 } 31 }
工具類Utils
FastJsonConvertUtil.java

1 package com.tianjh.demo.util; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 import com.alibaba.fastjson.JSON; 7 import com.alibaba.fastjson.JSONObject; 8 import com.alibaba.fastjson.serializer.SerializerFeature; 9 10 import lombok.extern.slf4j.Slf4j; 11 12 /** 13 * $FastJsonConvertUtil 14 * @author hezhuo.bai 15 * @since 2019年1月15日 下午4:53:28 16 */ 17 @Slf4j 18 public class FastJsonConvertUtil { 19 20 private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse, 21 SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty }; 22 23 /** 24 * <B>方法名稱:</B>將JSON字符串轉換為實體對象<BR> 25 * <B>概要說明:</B>將JSON字符串轉換為實體對象<BR> 26 * @author hezhuo.bai 27 * @since 2019年1月15日 下午4:53:49 28 * @param data JSON字符串 29 * @param clzss 轉換對象 30 * @return T 31 */ 32 public static <T> T convertJSONToObject(String data, Class<T> clzss) { 33 try { 34 T t = JSON.parseObject(data, clzss); 35 return t; 36 } catch (Exception e) { 37 log.error("convertJSONToObject Exception", e); 38 return null; 39 } 40 } 41 42 /** 43 * <B>方法名稱:</B>將JSONObject對象轉換為實體對象<BR> 44 * <B>概要說明:</B>將JSONObject對象轉換為實體對象<BR> 45 * @author hezhuo.bai 46 * @since 2019年1月15日 下午4:54:32 47 * @param data JSONObject對象 48 * @param clzss 轉換對象 49 * @return T 50 */ 51 public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) { 52 try { 53 T t = JSONObject.toJavaObject(data, clzss); 54 return t; 55 } catch (Exception e) { 56 log.error("convertJSONToObject Exception", e); 57 return null; 58 } 59 } 60 61 /** 62 * <B>方法名稱:</B>將JSON字符串數組轉為List集合對象<BR> 63 * <B>概要說明:</B>將JSON字符串數組轉為List集合對象<BR> 64 * @author hezhuo.bai 65 * @since 2019年1月15日 下午4:54:50 66 * @param data JSON字符串數組 67 * @param clzss 轉換對象 68 * @return List<T>集合對象 69 */ 70 public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) { 71 try { 72 List<T> t = JSON.parseArray(data, clzss); 73 return t; 74 } catch (Exception e) { 75 log.error("convertJSONToArray Exception", e); 76 return null; 77 } 78 } 79 80 /** 81 * <B>方法名稱:</B>將List<JSONObject>轉為List集合對象<BR> 82 * <B>概要說明:</B>將List<JSONObject>轉為List集合對象<BR> 83 * @author hezhuo.bai 84 * @since 2019年1月15日 下午4:55:11 85 * @param data List<JSONObject> 86 * @param clzss 轉換對象 87 * @return List<T>集合對象 88 */ 89 public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) { 90 try { 91 List<T> t = new ArrayList<T>(); 92 for (JSONObject jsonObject : data) { 93 t.add(convertJSONToObject(jsonObject, clzss)); 94 } 95 return t; 96 } catch (Exception e) { 97 log.error("convertJSONToArray Exception", e); 98 return null; 99 } 100 } 101 102 /** 103 * <B>方法名稱:</B>將對象轉為JSON字符串<BR> 104 * <B>概要說明:</B>將對象轉為JSON字符串<BR> 105 * @author hezhuo.bai 106 * @since 2019年1月15日 下午4:55:41 107 * @param obj 任意對象 108 * @return JSON字符串 109 */ 110 public static String convertObjectToJSON(Object obj) { 111 try { 112 String text = JSON.toJSONString(obj); 113 return text; 114 } catch (Exception e) { 115 log.error("convertObjectToJSON Exception", e); 116 return null; 117 } 118 } 119 120 /** 121 * <B>方法名稱:</B>將對象轉為JSONObject對象<BR> 122 * <B>概要說明:</B>將對象轉為JSONObject對象<BR> 123 * @author hezhuo.bai 124 * @since 2019年1月15日 下午4:55:55 125 * @param obj 任意對象 126 * @return JSONObject對象 127 */ 128 public static JSONObject convertObjectToJSONObject(Object obj){ 129 try { 130 JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj); 131 return jsonObject; 132 } catch (Exception e) { 133 log.error("convertObjectToJSONObject Exception", e); 134 return null; 135 } 136 } 137 138 public static String convertObjectToJSONWithNullValue(Object obj) { 139 try { 140 String text = JSON.toJSONString(obj, featuresWithNullValue); 141 return text; 142 } catch (Exception e) { 143 log.error("convertObjectToJSONWithNullValue Exception", e); 144 return null; 145 } 146 } 147 }
NetUtil.java

1 package com.tianjh.demo.util; 2 3 import java.lang.management.ManagementFactory; 4 import java.lang.management.RuntimeMXBean; 5 import java.net.InetAddress; 6 import java.net.NetworkInterface; 7 import java.net.SocketAddress; 8 import java.net.UnknownHostException; 9 import java.nio.channels.SocketChannel; 10 import java.util.Enumeration; 11 import java.util.regex.Matcher; 12 import java.util.regex.Pattern; 13 14 /** 15 * $NetUtil 16 * 獲取本機地址 端口號的工具類 17 * * @author hezhuo.bai 18 * * @since 2019年1月15日 下午4:53:28 19 */ 20 public class NetUtil { 21 22 public static String normalizeAddress(String address){ 23 String[] blocks = address.split("[:]"); 24 if(blocks.length > 2){ 25 throw new IllegalArgumentException(address + " is invalid"); 26 } 27 String host = blocks[0]; 28 int port = 80; 29 if(blocks.length > 1){ 30 port = Integer.valueOf(blocks[1]); 31 } else { 32 address += ":"+port; //use default 80 33 } 34 String serverAddr = String.format("%s:%d", host, port); 35 return serverAddr; 36 } 37 38 public static String getLocalAddress(String address){ 39 String[] blocks = address.split("[:]"); 40 if(blocks.length != 2){ 41 throw new IllegalArgumentException(address + " is invalid address"); 42 } 43 String host = blocks[0]; 44 int port = Integer.valueOf(blocks[1]); 45 46 if("0.0.0.0".equals(host)){ 47 return String.format("%s:%d",NetUtil.getLocalIp(), port); 48 } 49 return address; 50 } 51 52 private static int matchedIndex(String ip, String[] prefix){ 53 for(int i=0; i<prefix.length; i++){ 54 String p = prefix[i]; 55 if("*".equals(p)){ //*, assumed to be IP 56 if(ip.startsWith("127.") || 57 ip.startsWith("10.") || 58 ip.startsWith("172.") || 59 ip.startsWith("192.")){ 60 continue; 61 } 62 return i; 63 } else { 64 if(ip.startsWith(p)){ 65 return i; 66 } 67 } 68 } 69 70 return -1; 71 } 72 73 public static String getLocalIp(String ipPreference) { 74 if(ipPreference == null){ 75 ipPreference = "*>10>172>192>127"; 76 } 77 String[] prefix = ipPreference.split("[> ]+"); 78 try { 79 Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+"); 80 Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); 81 String matchedIp = null; 82 int matchedIdx = -1; 83 while (interfaces.hasMoreElements()) { 84 NetworkInterface ni = interfaces.nextElement(); 85 Enumeration<InetAddress> en = ni.getInetAddresses(); 86 while (en.hasMoreElements()) { 87 InetAddress addr = en.nextElement(); 88 String ip = addr.getHostAddress(); 89 Matcher matcher = pattern.matcher(ip); 90 if (matcher.matches()) { 91 int idx = matchedIndex(ip, prefix); 92 if(idx == -1) continue; 93 if(matchedIdx == -1){ 94 matchedIdx = idx; 95 matchedIp = ip; 96 } else { 97 if(matchedIdx>idx){ 98 matchedIdx = idx; 99 matchedIp = ip; 100 } 101 } 102 } 103 } 104 } 105 if(matchedIp != null) return matchedIp; 106 return "127.0.0.1"; 107 } catch (Exception e) { 108 return "127.0.0.1"; 109 } 110 } 111 112 public static String getLocalIp() { 113 return getLocalIp("*>10>172>192>127"); 114 } 115 116 public static String remoteAddress(SocketChannel channel){ 117 SocketAddress addr = channel.socket().getRemoteSocketAddress(); 118 String res = String.format("%s", addr); 119 return res; 120 } 121 122 public static String localAddress(SocketChannel channel){ 123 SocketAddress addr = channel.socket().getLocalSocketAddress(); 124 String res = String.format("%s", addr); 125 return addr==null? res: res.substring(1); 126 } 127 128 public static String getPid(){ 129 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 130 String name = runtime.getName(); 131 int index = name.indexOf("@"); 132 if (index != -1) { 133 return name.substring(0, index); 134 } 135 return null; 136 } 137 138 public static String getLocalHostName() { 139 try { 140 return (InetAddress.getLocalHost()).getHostName(); 141 } catch (UnknownHostException uhe) { 142 String host = uhe.getMessage(); 143 if (host != null) { 144 int colon = host.indexOf(':'); 145 if (colon > 0) { 146 return host.substring(0, colon); 147 } 148 } 149 return "UnknownHost"; 150 } 151 } 152 }
SetMDC.java

1 package com.tianjh.demo.util; 2 3 import org.jboss.logging.MDC; 4 import org.springframework.context.EnvironmentAware; 5 import org.springframework.core.env.Environment; 6 import org.springframework.stereotype.Component; 7 8 @Component 9 public class SetMDC implements EnvironmentAware { 10 11 private static Environment environment; 12 13 @Override 14 public void setEnvironment(Environment environment) { 15 SetMDC.environment = environment; 16 } 17 18 public static void putMDC() { 19 MDC.put("hostName", NetUtil.getLocalHostName()); 20 MDC.put("ip", NetUtil.getLocalIp()); 21 MDC.put("applicationName", environment.getProperty("spring.application.name")); 22 } 23 24 }
及關鍵配置文件
pom.xml

1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 <parent> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring-boot-starter-parent</artifactId> 8 <version>2.1.5.RELEASE</version> 9 <relativePath/> <!-- lookup parent from repository --> 10 </parent> 11 <groupId>com.tianjh</groupId> 12 <artifactId>demo</artifactId> 13 <version>0.0.1-SNAPSHOT</version> 14 <name>demo</name> 15 <description>Demo project for Spring Boot</description> 16 17 <properties> 18 <java.version>1.8</java.version> 19 </properties> 20 21 22 <dependencies> 23 <dependency> 24 <groupId>org.springframework.boot</groupId> 25 <artifactId>spring-boot-starter-web</artifactId> 26 <!-- 排除spring-boot-starter-logging --> 27 <exclusions> 28 <exclusion> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-logging</artifactId> 31 </exclusion> 32 </exclusions> 33 </dependency> 34 35 <dependency> 36 <groupId>org.springframework.boot</groupId> 37 <artifactId>spring-boot-starter-test</artifactId> 38 <scope>test</scope> 39 </dependency> 40 <dependency> 41 <groupId>org.projectlombok</groupId> 42 <artifactId>lombok</artifactId> 43 </dependency> 44 <!-- log4j2 --> 45 <dependency> 46 <groupId>org.springframework.boot</groupId> 47 <artifactId>spring-boot-starter-log4j2</artifactId> 48 </dependency> 49 <dependency> 50 <groupId>com.lmax</groupId> 51 <artifactId>disruptor</artifactId> 52 <version>3.3.4</version> 53 </dependency> 54 55 <dependency> 56 <groupId>com.alibaba</groupId> 57 <artifactId>fastjson</artifactId> 58 <version>1.2.58</version> 59 </dependency> 60 61 </dependencies> 62 63 <build> 64 <finalName>demo</finalName> 65 <!-- 打包時包含properties、xml --> 66 <resources> 67 <resource> 68 <directory>src/main/java</directory> 69 <includes> 70 <include>**/*.properties</include> 71 <include>**/*.xml</include> 72 </includes> 73 <!-- 是否替換資源中的屬性--> 74 <filtering>true</filtering> 75 </resource> 76 <resource> 77 <directory>src/main/resources</directory> 78 </resource> 79 </resources> 80 81 <plugins> 82 <plugin> 83 <groupId>org.springframework.boot</groupId> 84 <artifactId>spring-boot-maven-plugin</artifactId> 85 <configuration> 86 <mainClass>com.tianjh.demo.Application</mainClass> 87 </configuration> 88 </plugin> 89 </plugins> 90 </build> 91 92 </project>
Spring配置文件

1 server.servlet.context-path=/ 2 server.port=8001 3 4 spring.application.name=demo 5 spring.http.encoding.charset=UTF-8 6 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss 7 spring.jackson.time-zone=GMT+8 8 spring.jackson.default-property-inclusion=NON_NULL
日志配置文件

1 <?xml version="1.0" encoding="UTF-8"?> 2 <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600"> 3 <Properties> 4 <!-- 日志要輸出的文件--> 5 <Property name="LOG_HOME">logs</Property> 6 <!-- 項目名稱--> 7 <property name="FILE_NAME">demo</property> 8 <!-- 日志輸出格式--> 9 <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] 10 [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n 11 </property> 12 </Properties> 13 <Appenders> 14 <Console name="CONSOLE" target="SYSTEM_OUT"> 15 <PatternLayout pattern="${patternLayout}"/> 16 </Console> 17 <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/all-${FILE_NAME}.log" 18 filePattern="${LOG_HOME}/all-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"> 19 <PatternLayout pattern="${patternLayout}"/> 20 <Policies> 21 <TimeBasedTriggeringPolicy interval="1"/> 22 <SizeBasedTriggeringPolicy size="500MB"/> 23 </Policies> 24 <DefaultRolloverStrategy max="20"/> 25 </RollingRandomAccessFile> 26 <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/err-${FILE_NAME}.log" 27 filePattern="${LOG_HOME}/err-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"> 28 <PatternLayout pattern="${patternLayout}"/> 29 <Filters> 30 <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/> 31 </Filters> 32 <Policies> 33 <TimeBasedTriggeringPolicy interval="1"/> 34 <SizeBasedTriggeringPolicy size="500MB"/> 35 </Policies> 36 <DefaultRolloverStrategy max="20"/> 37 </RollingRandomAccessFile> 38 </Appenders> 39 <Loggers> 40 <!-- 業務相關 異步logger --> 41 <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true"> 42 <AppenderRef ref="appAppender"/> 43 </AsyncLogger> 44 <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true"> 45 <AppenderRef ref="errorAppender"/> 46 </AsyncLogger> 47 <Root level="info"> 48 <Appender-Ref ref="CONSOLE"/> 49 <Appender-Ref ref="appAppender"/> 50 <AppenderRef ref="errorAppender"/> 51 </Root> 52 </Loggers> 53 </Configuration>
打包上傳到192.168.232.6這台服務器進行運行
運行之后調用該項目的index方法
在項目指定的文件夾里生成了咋們所要的日志文件
參考前面的鏈接安裝好所有環境后,filebeat、kafka、Logstash、es都應該配置好了
接下來就結合filebeat(生產者-Producer)、kafka(broker)、Logstash(消費者-Consumer)
實現fielbeat從demo.jar項目輸出的日志文件logs下讀取all-demo.log 、err-demo.log兩個日志文件,然后
把相應日志數據發送到kafka中,再由Logstash到Kafka中獲取數據進行消費。
在這個過程中需要在kafka中新增兩個topic
./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic all-log-demo
--partitions 1 --replication-factor 1
./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic err-log-demo
--partitions 1 --replication-factor 1
也就是采集到的all-demo.log日志數據放入topic:all-log-demo 中,而采集到的err-demo.log日
志數據放入topic:err-log-demo 中。
五、測試
啟動demo.jar、filebeat、kafka、logstash
啟動demo.jar之前先刪除掉之前測試的日志文件all-demo.log 、err-demo.log
啟動demo.jar
啟動kafka
啟動Logstash
隨后通過瀏覽器訪問:
http://192.168.232.6:8001/err
http://192.168.232.6:8001/index 兩個地址來調用index、err方法打印日志文件。
demo.jar 項目控制台輸出了日志---調用index方法
demo.jar 項目控制台輸出了日志---調用err方法
在調用上述兩個方法之后,filebeat會將日志數據發送到kafka
通過使用如下的命令進行查看消費情況:
./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe
--group all-log-group
./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe
--group err-log-group
在kafka上也有相應的日志消費記錄情況,( all-log-group、err-log-group )是在Logstash中進
行配置的。
通過前面的幾個流程之后,現在日志數據到達了kafka-broker之上,現在就需要用logstash來進
行消費數據,logstash上也實時進行數據消費,如下圖所示是全量日志過濾:
all-log-demo.log 日志
err-log-demo.log 日志
通過上述的一系列操作,簡單的實現了日志數據的生成、采集、過濾。這其中最為重要也最核心
的地方就是kafka,利用kafka的高性能來緩存filebeat生成的海量數據,從而讓logstash慢慢的進
行消費,當然上面的例子並不能體現出kafka處理海量數據的能力。