因為某些異步日志設置了即使隊列滿了,也不可丟棄,在並發高的時候,導致請求方法同步執行,響應變慢.
編寫這個玩意,除了集中日志輸出以外,還希望在高並發的時間點有緩沖作用.
之前用Kafka實現了一次入隊速度不太理想,應該說比我寫本地機械硬盤還慢..................不知道是不是我方式不對,而且估計是因為針對有序寫入做了極大的優化,寫出固態硬盤下居然比機械還慢.............
后來用Redis實現了一次,由於Redis的連接方式問題,所以必須使用管道來減少獲取連接的性能損耗,入隊效率非常不錯
瞎扯,增加程序復雜度,又增加運維成本,完全不科學,拿固態硬盤陣列的服務器組分布式文件系統,掛載到應用服務器上才是王道!
由於管道沒有到達需要寫出得大小時(默認53個 arges),如果剛好又沒有日志進來,那么可能存在一直等待寫出得情況,所以里面寫了一個定時線程,可以根據需要修改寫出得周期時間
logback KafkaAppender 寫入Kafka隊列,集中日志輸出.
生產者程序完整POM
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.lzw</groupId> <artifactId>logqueue</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.2.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> <scope>provided</scope> </dependency> </dependencies> <distributionManagement> <repository> <id>maven-releases</id> <name>maven-releases</name> <url>http://192.168.91.137:8081/repository/maven-releases/</url> </repository> <snapshotRepository> <id>maven-snapshots</id> <name>maven-snapshots</name> <url>http://192.168.91.137:8081/repository/maven-snapshots/</url> </snapshotRepository> </distributionManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
只有一個類RedisAppender.java
package org.lzw.log.appender; import ch.qos.logback.core.Layout; import redis.clients.jedis.Client; import redis.clients.jedis.Pipeline; import ch.qos.logback.core.AppenderBase; import ch.qos.logback.core.status.ErrorStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.StringReader; import java.util.List; import java.util.Properties; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * User: laizhenwei */ public class RedisAppender<E> extends AppenderBase<E> { protected Layout<E> layout; private static final Logger LOGGER = LoggerFactory.getLogger("local"); private Pipeline pipeline; private Client client; private static ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1); private String queueKey; private String redisProperties; public void start() { super.start(); int errors = 0; if (this.layout == null) { this.addStatus(new ErrorStatus("No layout set for the appender named \"" + this.name + "\".", this)); ++errors; } LOGGER.info("Starting RedisAppender..."); final Properties properties = new Properties(); try { properties.load(new StringReader(redisProperties)); pipeline = new Pipeline(); client = new Client(properties.get("host").toString(), Integer.parseInt(properties.get("port").toString())); pipeline.setClient(client); } catch (Exception exception) { ++errors; LOGGER.warn(String.join("Kafka日志線程被拒絕:記錄倒本地日志:"), exception); } if (queueKey == null) { ++errors; System.out.println("未配置queueKey"); } else { System.out.println("日志將進入key為:[" + queueKey + "]的隊列!"); } if (errors == 0) { super.start(); exec.scheduleAtFixedRate(() -> this.sync(), 5, 5, TimeUnit.SECONDS); } } @Override public void stop() { super.stop(); pipeline.sync(); try { pipeline.close(); } catch (IOException e) { LOGGER.warn("Stopping RedisAppender...",e); } LOGGER.info("Stopping RedisAppender..."); } @Override protected void append(E event) { String msg = layout.doLayout(event); this.lpush(msg); } private void lpush(String msg){ try { pipeline.lpush(queueKey,msg); }catch (Exception ex){ LOGGER.warn(String.join(":","推送redis隊列失敗!",msg),ex); } } private void sync(){ try { pipeline.sync(); }catch (Exception ex){ List<Object> datas = client.getAll(); datas.stream().forEach(d->LOGGER.warn(String.join(":","推送redis隊列失敗!記錄到本地!",d.toString()))); } } public String getQueueKey() { return queueKey; } public void setQueueKey(String queueKey) { this.queueKey = queueKey; } public void setLayout(Layout<E> layout) { this.layout = layout; } public String getRedisProperties() { return redisProperties; } public void setRedisProperties(String redisProperties) { this.redisProperties = redisProperties; } }
寫完了,發布到Nexus

消費者application-test.yml
spring:
application:
name: logconsumer
profiles:
#指定讀取配置文件:dev(開發環境),prod(生產環境),qa(測試環境)
active: test
logKey:
basic-info-api: basic-info-api
redisParam:
host: 192.168.1.207
port: 6379
pool:
maxIdle: 20
maxTotal: 200
maxWaitMillis: -1
testOnBorrow: false
testOnReturn: false
logback-test.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="true"> <contextName>logback</contextName> <property name="LOG_HOME" value="/logconsumer"/> <!-- basicInfoApi --> <appender name="basicInfoApiAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/basic-info-api/logback.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/basic-info-api/logback-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <!--<fileNamePattern>${LOG_HOME}/e9/e9-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>--> <!-- 日志文件保留天數 --> <MaxHistory>30</MaxHistory> <!-- 文件大小觸發重寫新文件 --> <MaxFileSize>50MB</MaxFileSize> <totalSizeCap>10GB</totalSizeCap> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> <charset>UTF-8</charset> </encoder> </appender> <!--basicInfoApi異步輸出--> <appender name="basicInfoApiAasyncFile" class="ch.qos.logback.classic.AsyncAppender"> <discardingThreshold>0</discardingThreshold> <queueSize>2048</queueSize> <appender-ref ref="basicInfoApiAppender"/> </appender> <!--basicInfoApi消費者所在包路徑--> <logger name="org.lzw.logconsumer.consumer.BasicInfoApiConsumer" level="INFO" additivity="false"> <appender-ref ref="basicInfoApiAasyncFile"/> </logger> <!--<!– ############################## 我是分割線 ############################################ –>--> <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/logconsumer/logback.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/logconsumer/logback-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <!--<fileNamePattern>${LOG_HOME}/front/front-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>--> <!-- 日志文件保留天數 --> <MaxHistory>30</MaxHistory> <!-- 文件大小觸發重寫新文件 --> <MaxFileSize>50MB</MaxFileSize> <totalSizeCap>1GB</totalSizeCap> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern> <charset>UTF-8</charset> </encoder> </appender> <root level="warn"> <appender-ref ref="file" /> </root> </configuration>
啟動類LogconsumerApplication.java
/** * User: laizhenwei */ @SpringBootApplication public class LogconsumerApplication { public static void main(String[] args) { SpringApplication.run(LogconsumerApplication.class, args); } @Component public static class ConsumersStartup implements CommandLineRunner { ExecutorService executorService = Executors.newCachedThreadPool(); @Autowired private BasicInfoApiConsumer basicInfoApiConsumer; @Override public void run(String... strings) { executorService.execute(()-> basicInfoApiConsumer.writeLog()); } } }
RedisService.java
@Component public class RedisService { Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${redisParam.host}") private String host; @Value("${redisParam.port}") private Integer port; @Value("${redisParam.pool.maxIdle}") private Integer maxIdle; @Value("${redisParam.pool.maxTotal}") private Integer maxTotal; @Value("${redisParam.pool.maxWaitMillis}") private Integer maxWaitMillis; @Value("${redisParam.pool.testOnBorrow}") private Boolean testOnBorrow; @Value("${redisParam.pool.testOnReturn}") private Boolean testOnReturn; private static JedisPoolConfig config = new JedisPoolConfig(); private static JedisPool pool; @PostConstruct public void init(){ config.setMaxIdle(maxIdle); config.setMaxTotal(maxTotal); config.setMaxWaitMillis(maxWaitMillis); config.setTestOnBorrow(testOnBorrow); config.setTestOnReturn(testOnReturn); pool = new JedisPool(config, host, port); } public String brpop(int timeOut, String key) { Jedis jedis = null; try { jedis = pool.getResource(); return jedis.brpop(timeOut, key).get(1); } catch (Exception ex) { logger.warn("redis消費異常",ex); return "redis消費異常"; } finally { if (jedis != null) jedis.close(); } } }
BasicInfoApiConsumer.java
/** * 日志消費者 */ @Component public class BasicInfoApiConsumer { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${logKey.basic-info-api}") private String logKey; @Autowired private RedisService redisService; public void writeLog() { while (true){ System.out.println(1); logger.info(redisService.brpop(0, logKey)); } } }
隨便拿個應用跑一個 這里用basic-info-api
pom.xml
<dependency> <groupId>org.lzw</groupId> <artifactId>logqueue</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="true"> <contextName>logback</contextName> <property name="log.path" value="/home/logs/basic-info-api/logback.log"/> <appender name="redisAppender" class="org.lzw.log.appender.RedisAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>warn</level> </filter> <queueKey>basic-info-api</queueKey> <redisProperties> host=192.168.1.207 port=6379 </redisProperties> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern> </layout> </appender> <appender name="localAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>warn</level> </filter> <file>${log.path}</file> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>warn</level> </filter> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern> <!-- 日志文件保留天數 --> <MaxHistory>30</MaxHistory> <!-- 文件大小觸發重寫新文件 --> <MaxFileSize>50MB</MaxFileSize> <totalSizeCap>10GB</totalSizeCap> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern> <charset>UTF-8</charset> </encoder> </appender> <appender name="asyncLocal" class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丟失日志.默認的,如果隊列的80%已滿,則會丟棄TRACT、DEBUG、INFO級別的日志 --> <discardingThreshold>0</discardingThreshold> <queueSize>2048</queueSize> <appender-ref ref="localAppender"/> </appender> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>debug</level> </filter> <encoder> <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n </pattern> </encoder> </appender> <!--萬一redis隊列不通,記錄到本地--> <logger name="local" additivity="false"> <appender-ref ref="asyncLocal"/> </logger> <appender name="asyncRedisAppender" class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丟失日志.默認的,如果隊列的80%已滿,則會丟棄TRACT、DEBUG、INFO級別的日志 --> <discardingThreshold>0</discardingThreshold> <queueSize>2048</queueSize> <appender-ref ref="redisAppender"/> </appender> <root level="warn"> <appender-ref ref="asyncRedisAppender"/> </root> <logger name="org.springframework.session.web.http.SessionRepositoryFilter" level="error"/> <logger name="org.springframework.scheduling" level="error"/> <Logger name="org.apache.catalina.util.LifecycleBase" level="error"/> <Logger name="org.springframework.amqp" level="warn"/> </configuration>
寫一段長日志
@Slf4j @EnableEurekaClient @EnableCircuitBreaker @SpringBootApplication public class BasicInfoApiApplication { public static void main(String[] args) { SpringApplication.run(BasicInfoApiApplication.class, args); } @Component public static class ConsumersStartup implements CommandLineRunner { ExecutorService executorService = Executors.newCachedThreadPool(); String msg = "--endpoints=https://192.168.91.138:2379,https://192.168.91.139:2379," + "https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379," + "https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,h" + "ttps://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,ht" + "tps://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,http" + "s://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,https://" + "192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,https://192.1" + "68.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,https://192.168.91." + "139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,https://192.168.91.139:23" + "79,https://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,https://192.168.91.139:2379,http" + "s://192.168.91.140:2379--endpoints=https://192.168.91.138:2379,https://192.168.91.139:2379,https://192" + ".168.91.140:2379--endpoints=https://192.168.91.138:2379,https://192.168.91.139:2379,https://192.168.9" + "1.140:2379--endpoints=https://192.168.91.138:2379,https://192.168.91.139:2379,https://192.168.91.140" + ":2379--endpoints=https://192.168.91.138:2379,https://192.168.91.139:2379,https://192.168.91.140:2379-" + "-endpoints=https://192.168.91.138:2379,https://192.168.91.139:2379,https://192.168.91.140:2379--endpoi" + "nts=https://192.168.91.138:2379,https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=h" + "ttps://192.168.91.138:2379,https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https:/" + "/192.168.91.138:2379,https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192." + "168.91.138:2379,https://192.168.91.139:2379,https://192.168.91.140:2379--endpoints=https://192.168.91" + ".138:2379,https://192.168.91.139:2379,https://192.168.91.140:2379"; @Override public void run(String... strings) { long begin = System.nanoTime(); for(int i=0;i<10000;i++) executorService.execute(() -> log.warn(msg)); executorService.shutdown(); for(;;){ if(executorService.isTerminated()){ System.out.println((System.nanoTime()-begin)/1000000); break; } } } } }
輸出 1328 毫秒,這里僅僅只是進去隊列的時間就需要1秒多,感覺還是很慢.這里進入了以后並沒有完全寫入硬盤,看另一個消費程序,還不停地在消費.
