一:前言
寫這個程序主要是用來理解生產者消費者模型,以及通過這個Demo來理解Redis的單線程取原子任務是怎么實現的和鞏固一下並發相關的知識;這個雖然是個Demo,但是只要稍加改下Appender部分也是可以用於項目中的,假設項目里確實不需要log4j/logback之類的日志組件的時候;
二:實現方式
1.利用LinkedList作為MQ(還可以用jdk自帶的LinkedBlockingQueue,不過這個Demo主要是為了更好的理解原理因此寫的比較底層);
2.利用一個Daemon線程作為消費者從MQ里實時獲取日志對象/日志記錄,並將它提交給線程池,由線程池再遍歷所有的appender並調用它們的通知方法,這個地方還可以根據場景進行效率優化,如將循環遍歷appender改為將每個appender都再此提交到線程池實現異步通知觀察者;
3.為生產者提供log方法作為生產日志記錄的接口,無論是生產日志對象還是消費日志對象在操作隊列時都需要對隊列加鎖,因為個人用的是非並發包里的;
4.消費者在獲取之前會先判斷MQ里是否有數據,有則獲取並提交給線程池處理,否則wait;
5.生產者生產了日志對象后通過notify通知消費者去取,因為只有一個消費者,而生產者是不會wait的因此只需要notify而不用notifyAll
6.。。剩下的就看代碼來說明吧;
三:代碼
1.MyLogger類的實現
package me.silentdoer.mqlogger.log; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.DEBUG; import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.ERROR; /** * @author silentdoer * @version 1.0 * @description 這里只是做一個簡單的logger實現,不提供Appender之類的功能,主要是用來學習生產者和消費者及MQ的實現原理 * @date 4/26/18 6:07 PM */ public class MyLogger{ private LogLevel loggerLevel = DEBUG; private String charset = "UTF-8"; // 暫且沒用,但是當需要序列化時是可能用到的; // TODO 也可以直接用LinkedQueue,然后手動通過ReentrantLock來實現並發時的數據安全(synchronized也可) //private BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<LogRecord>(); // 可以理解為支持並發的LinkedList // TODO 想了一下既然是要學習原理干脆就實現的更底層一點 private final Queue<LogRecord> records = new LinkedList<LogRecord>(); // TODO 用於記錄生產了多少條日志,可供外部獲取 private AtomicLong produceCount = new AtomicLong(0); // TODO 用於記錄消費了多少條日志 private AtomicLong consumeCount = new AtomicLong(0); // TODO 日志記錄的Consumer private Thread consumer = new LogDaemon(); public MyLogger(){ consumer.setDaemon(true); consumer.start(); } /** * 對外提供的接口,即log方法就是生產者用於生產日志數據的接口 * @param msg * @param level */ public void log(String msg, LogLevel level){ Date curr = generateCurrDate(); log(new LogRecord(level, msg, curr)); } /** * 對外提供的接口,即log方法就是生產者用於生產日志數據的接口 * @param msg */ public void log(String msg){ Date curr = generateCurrDate(); log(new LogRecord(this.loggerLevel, msg, curr)); } /** * 給生產者(即調用log的方法都可以理解為生產者在生產日志對象)提供用於生產日志記錄的接口 * @param record */ public void log(LogRecord record){ // ReentrantLock可以替代synchronized,不過當前場景下synchronized已經足夠 synchronized (this.records){ // TODO 如果用的是LinkedBlockingQueue是不需要這個的 this.records.offer(record); this.produceCount.incrementAndGet(); this.records.notify(); // TODO 只有一個線程會records.wait(),因此notify()足夠 } } // TODO 類似Redis的那個單線程,用於讀取命令對象,而這里則是用於讀取LogRecord並通過appender將數據寫到相應位置 private class LogDaemon extends Thread{ private volatile boolean valid = true; // 充當appenders的角色 private List<Writer> appenders = null; private ExecutorService threadPool = new ThreadPoolExecutor(1, 3 , 180000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024)); @Override public void run() { while(this.valid){ // TODO 根據最少知道原則,在這里不要去想整體里是否存在打斷此線程的地方,你就認為此線程是可能被外界打斷的即可,因此需要做一定處理 try { synchronized (MyLogger.this.records) { if (MyLogger.this.records.size() <= 0) { MyLogger.this.records.wait(); } final LogRecord firstRecord = MyLogger.this.records.poll(); MyLogger.this.consumeCount.incrementAndGet(); //threadPool.submit() threadPool.execute(() -> MyLogger.this.notifyAppender(this.appenders, firstRecord)); } }catch (InterruptedException ex){ this.valid = false; ex.printStackTrace(); }catch (Throwable t){ t.printStackTrace(); } } } } private void notifyAppender(final List<Writer> appenders, final LogRecord record) { if(appenders == null){ PrintWriter writer = new PrintWriter(record.level == ERROR ? System.err : System.out); writer.append(record.toString()); writer.flush(); }else{ // TODO 這種是同步的方式,如果是異步的方式可以將每個appender的執行都由一個Runnable對象包裝,然后submit給線程池(或者中間加個中間件) for(Writer writer : appenders){ try { writer.append(record.toString()); }catch (IOException ex){ ex.printStackTrace(); } } } } /** * 用於產生當前時間的模塊,防止因為並發而導致LogRecord的timestamp根實際情況不符 */ private Lock currDateLock = new ReentrantLock(); // 直接用synchronized亦可 private Date generateCurrDate(){ currDateLock.lock(); Date result = new Date(); currDateLock.unlock(); return result; } // 生產者生產的數據對象 public static class LogRecord{ private LogLevel level; private String msg; private Date timestamp; private static final SimpleDateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); private SimpleDateFormat dateFormat = DEFAULT_DATE_FORMAT; /*public LogRecord(){ this(INFO, ""); }*/ public LogRecord(LogLevel level, String msg){ this(level, msg, new Date()); // 還是最好由外界設置timestamp,否則高並發下會比較不准 } // TODO 最好用這個,不然高並發下timestamp容易出現順序不准確的情況。 public LogRecord(LogLevel level, String msg, Date timestamp){ this.level = level; this.msg = msg; this.timestamp = timestamp; } @Override public String toString(){ return String.format("[Level:%s, Datetime:%s] : %s\n", level, dateFormat.format(timestamp), msg); } public LogLevel getLevel() { return level; } public String getMsg() { return msg; } public void setDateFormat(SimpleDateFormat dateFormat) { this.dateFormat = dateFormat; } public void setTimestamp(Date timestamp) { this.timestamp = timestamp; } } public enum LogLevel{ // TODO 內部enum默認就是static INFO, DEBUG, ERROR } public LogLevel getLoggerLevel() { return loggerLevel; } public void setLoggerLevel(LogLevel loggerLevel) { this.loggerLevel = loggerLevel; } public String getCharset() { return charset; } public void setCharset(String charset) { this.charset = charset; } public AtomicLong getProduceCount() { return produceCount; } public AtomicLong getConsumeCount() { return consumeCount; } }
2.測試用例1
package me.silentdoer.mqlogger; import me.silentdoer.mqlogger.log.MyLogger; import java.util.Scanner; /** * @author silentdoer * @version 1.0 * @description the description * @date 4/26/18 10:13 PM */ public class Entrance { private static MyLogger logger = new MyLogger(); public static void main(String[] args){ //logger.setLoggerLevel(MyLogger.LogLevel.ERROR); Scanner scanner = new Scanner(System.in); String line; while(!(line = scanner.nextLine()).equals("exit")){ if(line.equals("")) continue; logger.log(line); System.out.println(String.format("共生產了%s條日志。", logger.getConsumeCount())); try { Thread.sleep(500); }catch (InterruptedException ex){ } System.out.println(String.format("共消費了%s條日志。", logger.getProduceCount())); } } }
3.測試用例2
package me.silentdoer.mqlogger; import me.silentdoer.mqlogger.log.MyLogger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author silentdoer * @version 1.0 * @description the description * @date 4/26/18 10:32 PM */ public class Entrance2 { private static MyLogger logger = new MyLogger(); public static void main(String[] args){ logger.setLoggerLevel(MyLogger.LogLevel.ERROR); ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0;i<10;i++){ final int index = i + 1; threadPool.execute(() -> { logger.log(String.format("生產的第%s條記錄。", index)); System.out.println(String.format("共生產了%s條記錄。", index)); }); try { Thread.sleep(100); }catch (InterruptedException ex){ } } try { Thread.sleep(3000); System.out.println(String.format("共%s條記錄被消費。", logger.getConsumeCount())); }catch (InterruptedException ex){ } //threadPool.shutdown(); //threadPool.shutdownNow(); } }
四:補充
如果想實現像BlockingQueue一樣能夠控制MQ的元素個數范圍,則可以通過ReentrantLock的Confition來實現,即通過lock創建兩個Condition對象,一個用來描述是否MQ中元素達到上限的情況,一個用於描述MQ中元素降到下限的情況;
無論是達到上限或降到下限都會通過相應的condition對象來阻塞對應的生產者或消費者的生產/消費過程從而實現MQ元素個數的可控性;