JAVA多線程統計日志計數時的線程安全及效率問題


最近工作上遇到一個需求:需要根據nginx日志去統計每個域名的qps(Query Per Second,每秒查詢率)數據。

解決了日志讀取等問題之后,為了寫一個盡可能高效的統計模塊,我決定用多線程去計數,然后將統計結果保存在Map中。用多線程去計數的需求還是比較常見的。

HashMap 線程不安全,操作時只能加synchronized,結果還是單線程的計數,效率太低。ConcurrentHashMap是線程安全的,就用它了。

先看第一版代碼:

 1     // 先定義一個全局的Map
 2     private Map<String, Integer> counter = new ConcurrentHashMap<>();
 3 
 4     // 統計方法
 5     private static final String separator = "|-|";
 6     public void countLog(NginxLog nginxLog) {
 7         String key = nginxLog.getHost() + separator + nginxLog.getDate();
 8         // 先取一下之前的值,然后再加一插入進去
 9         Integer oldValue = counter.putIfAbsent(key, 1);
10         if (oldValue != null) {
11             counter.put(key, oldValue.intValue() + 1);
12         }
13     }

這段統計代碼顯然是不行的,ConcurrentHashMap雖然是線程安全類,並且也能保證所提供的方法是線程安全的,但是這並不代表使用它你的程序就是線程安全的。

在這段代碼中counter.putIfAbsent()操作是原子性操作,counter.put()也是原子操作。但兩者組合起來這就產生問題了。

我們舉個例子:比如說現在有兩個線程先后運行到counter.putIfAbsent()方法,然后兩個線程都取到了同樣的oldValue值,假設此值為10,然后兩個線程都將執行counter.put()方法,此時兩個線程都是在執行counter.put(key, 11)。這顯然是不合理的,計數次數理應為12的。

為了解決這個問題,我想到了兩種思路:

1、給countLog方法加上synchronized同步,如此使用ConcurrentHashMap就沒有多大必要了,改成HashMap好了,這就是最開始的思路,代碼如下:

 1     private Map<String, Integer> counter = new HashMap<>();
 2 
 3     public synchronized void countLog(NginxLog nginxLog) {
 4         String key = nginxLog.getHost() + separator + nginxLog.getDate();
 5         // 先取一下之前的值,然后再加一插入進去
 6         Integer oldValue = counter.putIfAbsent(key, 1);
 7         if (oldValue != null) {
 8             counter.put(key, oldValue.intValue() + 1);
 9         }
10     }

執行測試運行結果為:

2017-11-28 14:43:17,292  INFO NginxLogCountTest - count: 100000, costTime: 23 ms

因為加了同步鎖,相當於計數都是單線程在進行的,因此統計結果也是正確的,耗時23ms

2、第二種思路,使用AtomicInteger類計數。ConcurrentHashMap和AtomicInteger類組合。代碼如下: 

1     private Map<String, AtomicInteger> counter = new ConcurrentHashMap<>();
2 
3     public void countLog(NginxLog nginxLog) {
4         String key = nginxLog.getHost() + separator + nginxLog.getDate();
5         AtomicInteger oldValue = counter.putIfAbsent(key, new AtomicInteger(1));
6         if (oldValue != null) {
7             oldValue.incrementAndGet();
8         }
9     }

執行測試運行結果為: 

2017-11-28 14:53:14,655  INFO NginxLogCountTest - count: 100000, costTime: 11 ms

這種解決方案里面將AtomicInteger和ConcurrentHashMap組合到一起,counter.putIfAbsent()執行后可以獲得當前值的AtomicInteger對象,這個時候使用AtomicInteger對象的incrementAndGet方法。這種組合相當於將兩步操作分擔給兩個線程安全類來處理了。

從執行時間來看相對於單線程計數也還是有一定優勢的。 

最后附上測試用的代碼:

 1     @Resource
 2     private NginxLogCount nginxLogCount;
 3 
 4     @Test
 5     public void testCountLog() {
 6         NginxLog nginxLog = new NginxLog();
 7         nginxLog.setHost("test.com");
 8         nginxLog.setDate("2017-11-28T11:43:46+08:00");
 9         String key = nginxLog.getHost() + "|-|" + nginxLog.getDate();
10         long startTime = System.currentTimeMillis();
11         for (int j = 0; j < 10; j++) {
12             Thread thread = new Thread(new Runnable() {
13                 @Override
14                 public void run() {
15                     for (int i = 0; i < 10000; i++) {
16                         nginxLogCount.countLog(nginxLog);
17                     }
18                 }
19             });
20             thread.setDaemon(false);
21             thread.start();
22         }
23         long endTime = System.currentTimeMillis();
24         try {
25             // 等待運行結束
26             TimeUnit.SECONDS.sleep(10);
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }
30         log.info("count: {}, costTime: {} ms", nginxLogCount.getValue(key), endTime - startTime);
31     }
32 
33     // getValue方法
34     public int getValue(String key) {
35         AtomicInteger value = counter.get(key);
36         return value == null ? 0 : value.intValue();
37     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM