拜托!不要再問我是否了解多線程了好嗎


  面試過程中,各面試官一般都會教科書式的問你幾個多線程的問題,但又不知從何問起。於是就來一句,你了解多線程嗎?拜托,這個好傷自尊的!

  相信老司機們對於java的多線程問題處理,穩如老狗了。你問我了解不?都懶得理你。

  不過,既然是面對的是面試官,那你還得一一說來。

  今天我們就從多個角度來領略下多線程技術吧!

1. 為什么會有多線程?

  其實有的語言是沒有多線程的概念的,而java則是從一出生便有了多線程天賦。為什么?
  多線程技術一般又被叫做並發編程,目的是為了程序運行得更快。
  其基本原理是,是由cpu進行不同線程的調度,從而實現多個線程的同時運行效果。
  多進程和多線程類似,只是多進程不會共享內存資源,切換開銷更大,所以多線程是更明智的選擇。
  而在計算機出現早期,或者也許你也能找到單核的cpu,這時候的多線程是通過不停地切換唯一一個可以運行的線程來實現的,由於切換速度比較快,所以感覺就是多線程同時在運行了。在這種情況下,多線程與多進程等同的。但是,至少也讓用戶有了可以同時處理多任務的能力了,也是很有用的。
  而當下的多核cpu時代,則是真正可以同時運行多個線程的時代,什么四核八線程,八核八線程.... 意味着可以同時並行n個線程。如果我們能讓所有可用的線程都利用起來,那么我們的程序運行速度或者說整體性能將會得到極大提升。這是我們技術人員的目標。

 

2. 多線程就一定快嗎?(簡略)

  看起來,多線程確實挺好,但是凡事皆有度。過尤不及。

  如果只運行與cpu能力范圍內的n線程,那是絕對ok的。但當你線程數超過這個n時,就會涉及到cpu的調度問題,調度時即會涉及一個上下文切換問題,這是要耗費時間和資源的東西。當cpu疲於奔命調度切換時,則多線程就是一個負擔了。

 

3. 多線程主要注意什么問題?(簡略)

  多線程要注意的問題多了去了,畢竟這是一門不簡單的學問,但是我們也可以總結下:

  1. 線程安全性問題;如果連正確性都無法保障,談性能有何意義?
  2. 資源隔離問題;是你就是你的,不是你的就不是你的。
  3. 可讀性問題;如果為了多線程,將代碼搞得一團糟,是否值得?
  4. 外部環境問題;如果外部環境很糟糕,那么你內部性能再好,你能把壓力給外部嗎?

 

4. 創建多線程的方式?(簡略)

  這個問題確實有點low, 不過也是一個體現真實實踐的地方!

  1. 繼承Thread類,然后 new MyThread.start();
  2. 繼承Runnable類, 然后 new Thread(runnable).start();
  3. 繼承Callable類,然后使用 ExecutorService.submit(callable);
  4. 使用線程池技術,直接創建n個線程,將上面的方法再來一遍,new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 簡化版: Executors.newFixedThreadPool(n).submit(runnable);

 

5. 來點實際的場景?(重點)

  理論始終太枯燥,不如來點實際的。

  有同學說,我平時就寫寫業務代碼,而業務代碼基本由用戶觸發,一條線程走到底,哪來的多線程實踐?

  好,我們可以就這個問題來說下,這種業務的多線程:

  1. 比如一個http請求,對應一個響應,如果不使用多線程,會怎么樣?我們可以簡單地寫一個socket服務器,進行處理業務,但是這絕對不是你想看到的。比如我們常用的 spring+tomcat, 哪里沒有用到多線程技術?

        http-nio-8080-exec-xxx #就是一個線程池中例子。

  2. 任何一個java應用,啟動起來之后,都會有很多的GC線程運行,這難道不是多線程?如:

        "G1 Main Concurrent Mark GC Thread" os_prio=0 tid=0x00007fb91008f000 nid=0x40e7 runnabl
        "Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007fb910061800 nid=0x40de runnable

  如上這些多線程場景吧,面試官說,就算你了解其原理,那也不算是你的。你有真正使用過多線程嗎?

  接下來,我們就來說道說道,實際業務場景中,有哪些是我們可能會用上的,供大家參考:

看下多線程中幾個有趣或者經典的場景用法!

  場景1. 我有一個發郵件的功能,用戶操作成功后,我給他發送郵件,如何高效穩定地完成?

  場景2. 我有m個線程在循環執行主方法,為實現高效處理,將分離n*m個子線程執行相關聯流程,要求子線程必須等到主線程執行完成后才能執行,如何保證?

  場景3. 某合作公司要求請求其api的qps不得大於n,如何保證?

  場景4. 一個大任務如何提高響應速度?

  場景5. 我有n個線程同時開始處理一個事務,要求至少等到一個線程執行完畢后,才能進行響應返回,如何高效處理?

  場景6. 抽象任務,后台運行處理任務多線程?

 

大家應該已經見過世面了,這點問題還不至於,對吧。那你可以拿出你的方案了。


下面是我的解決方案:

 

場景1. 我有一個發郵件的功能,用戶操作成功后,我給他發送郵件,如何高效穩定地完成?
場景1解決:(常規型)

  這個可以說最實用最簡單的多線程應用場景了,不過現在進行微服務化之后,可能會有一些不同。換湯不換葯。

  針對C端用戶的多線程,我們是不建議使用 new Thread() 這種方式的,線程池是個常用伎倆。

    ExecutorService mailExecutors = Executors.newFixedThreadPool(20);

    public void sendMail() {
        mailExecutors.submit(() -> {
            // do send mail biz, http, rpc,...
            System.out.println("sending mail");
        });
    }

 

場景2. 我有m個線程在循環執行主方法,為實現高效處理,將分離n*m個子線程執行相關聯流程,要求子線程必須等到主線程執行完成后才能執行,如何保證?
場景2解決:(所有等待型)

  主任務,只管調度子線程,在子線程使用閉鎖在適當的地方進行等待,主線程循環分配完成后,打開閉鎖,放行所有子線程即可。

  具體代碼如下:

    private void mainWork() {
        try {
            resetRedisZsetLockGate();
            for (String linkTraceCacheKey : expiredKeys) {
                subWork(linkTraceCacheKey);
            }
        }
        finally {
            releaseRedisZsetLock();
        }
    }
    
    private void subWork(String linkTraceCacheKey) {
        deleteService.execute(new Runnable() {
            @Override
            public void run() {
                // do other biz
                blockingWaitRedisZsetLock();
                postSth(linkTraceCacheKey);
            }
        });
    }
    
    /**
     * 重置鎖網關,每次主方法的調度都將得到一個私有的鎖
     */
    private void resetRedisZsetLockGate() {
        redisZsetScanLockGate = new CountDownLatch(1);
    }
    
    /**
     * 阻塞等待 鎖
     */
    private void blockingWaitRedisZsetLock() {
        final CountDownLatch myGate = redisZsetScanLockGate;
        try {
            myGate.await();
        } 
        catch (InterruptedException e) {
            logger.error("等待鎖中斷異常", e);
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 釋放鎖
     */
    private void releaseRedisZsetLock() {
        final CountDownLatch myGate = redisZsetScanLockGate;
        myGate.countDown();
    }

 

場景3. 某合作公司要求請求其api的qps不得大於n,如何保證?
場景3解決:(流量控制型、有限資源型)

  這種問題准確的說,使用單機的多線程還是有點難控制的,但是我們只是為了講清道理,具體(集群)做法只要稍做變通即可。

  簡單點說,就是作用一個 Semphore 信號量進行數量控制,當數量未到時,直接多線程並發請求,到達限制后,則等待有空閑位置再進行!

public class AbstractConcurrentSimpleLiteJobBase {
    /**
     * 並發查詢:5 , 動態配置化
     */
    private final Semaphore maxConcurrentQueryLock;

    /**
     * 同步等待結束鎖,視情況使用,同一個線程可能提交多次任務,由同一個 holder 管理
     */
    private final ThreadLocal<List<Future<?>>> endGateTaskFutureContainer = new ThreadLocal<>();

    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public AbstractConcurrentSimpleLiteJobBase() {
        maxConcurrentQueryLock = new Semaphore(getMaxConcurrentThreadNum());
    }

    /**
     * 獲取最大允許的並發數,子類可自定義, 默認:5
     *
     * @return 最大並發數
     */
    protected int getMaxConcurrentThreadNum() {
        return 5;
    }
    
    /**
     * 提交一個任務到線程池執行
     *
     * @param task 任務
     */
    protected void submitTask(Runnable task) {
        // 考慮是否要阻塞等待結果
        Future<?> future1 =  threadPoolTaskExecutor.submit(() -> {
            try {
                maxConcurrentQueryLock.acquire();
            }
            catch (InterruptedException ie) {
                // ignore...
                log.error("【任務運行】異常,中斷", ie);
                Thread.currentThread().interrupt();
                return;
            }
            try {
                task.run();
            }
            finally {
                maxConcurrentQueryLock.release();
            }
        });
        endGateCountDown(future1);
    }
    
    /**
     * 等待線程結果完成,並清理 gate 信息
     */
    private void awaitForComplete() {
        try {
            // 同步等待執行完成,防止並發任務執行
            for(Future<?> future1 : endGateTaskFutureContainer.get()) {
                future1.get();
            }
            endGateTaskFutureContainer.remove();
        }
        catch (ExecutionException e) {
            log.error("【任務執行】異常,拋出異常", e);
        }
        catch (InterruptedException e) {
            log.error("【任務執行】異常,中斷", e);
        }
    }


}

 

場景4. 一個大任務如何提高響應速度?
場景4解決:(大任務拆分型)

  針對大任務的處理,基本想到的都是類似於分布式計算之類的東西(map/reduce),在java單機操作來說,標准的解決方案是 Fork/Join 框架。

public class MyForkJoinTask extends RecursiveTask<Integer> {
    //原始數據
    private List<Integer> records;

    public MyForkJoinTask(List<Integer> records) {
        this.records = records;
    }

    @Override
    protected Integer compute() {
        //任務拆分到可接受程度后,運行處理邏輯
        if (records.size() < 3) {
            return doRealCompute();
        }
        // 否則一直往下拆分任務
        int size = records.size();
        MyForkJoinTask aTask = new MyForkJoinTask(records.subList(0, size / 2));
        MyForkJoinTask bTask = new MyForkJoinTask(records.subList(size / 2, records.size()));
        //兩個任務並發執行
        invokeAll(aTask, bTask);
        //結果合並
        return aTask.join() + bTask.join();
    }

    /**
     * 真正任務處理邏輯
     */
    private int doRealCompute() {
        try {
            Thread.sleep((long) (records.size() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("計算任務:" + Arrays.toString(records.toArray()));
        return records.size();
    }

    // 測試任務
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(5);
        List<Integer> originalData = new ArrayList<>();
        originalData.add(1);
        originalData.add(2);
        originalData.add(3);
        originalData.add(4);
        originalData.add(5);
        originalData.add(6);
        originalData.add(7);
        originalData.add(8);
        originalData.add(9);
        originalData.add(10);
        originalData.add(11);
        originalData.add(12);
        originalData.add(13);

        MyForkJoinTask myForkJoinTask = new MyForkJoinTask(originalData);
        long t1 = System.currentTimeMillis();
        ForkJoinTask<Integer> affectNums = forkJoinPool.submit(myForkJoinTask);
        System.out.println("affect nums: " + affectNums.get());
        long t2 = System.currentTimeMillis();
        System.out.println("cost time: " + (t2-t1));
    }
}

  其實如果不用Fork/join 框架,也是可以的,比如我就只開n個線依次從數據源處取數據進行處理,最后將結果合並到另一個隊列中。只是,這期間你得多付出多少努力才能做到 Fork/Join 相同的效果呢!

  當然了,Fork/Join 的重要特性是: 使用了work-stealing算法。Worker線程跑完任務后,可以從其他還在忙着的線程去竊取任務。

  你要願意造輪子,也是可以的。

 

場景5. 我有n個線程同時開始處理一個事務,要求至少等到一個線程執行完畢后,才能進行響應返回,如何高效處理?
場景5解決:(至少一個返回型)

  初步思路: 主任務中,使用一個閉鎖,CountDownLatch(1); 所有子線程執行完成,調用 latch.countDown(); 開啟一次閉鎖。主任務執行完成后,調用 latch.await(); 阻塞等待,當有任意一個子線程打開閉鎖后,就可以返回了。

  但是這個是有問題的,即這個鎖只會有一次生效機會,后續的完成動作並不會有實際意義,因此只能換一個方式。

  使用回調實現,就容易多了,只要一個任務完成,就做一次回調,主任務如果分配完成后,發現有空閑的任務槽,就立即進行下一次分配即可,沒有則等到有再進行分配工作。

  具體代碼如下:

public class TaskDispatcher {
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting assign */
    private final Condition finishedTaskNotEmpty;

    /**
     * 正在運行的任務計數器
     */
    private final AtomicInteger runningTaskCounter = new AtomicInteger(0);

    /**
     * 新完成的任務計數器,當被重新分派后,此計數將會被置0
     */
    private Integer newFinishedTaskCounter = 0;
    
    private void consumLogHub(String shards) throws InterruptedException {
        resetConsumeCounter();
        String[] shardList = shards.split(",");
        for (int i = 0; i < shardList.length; i++) {
            String shard = shardList[i];
            int shardId = Integer.parseInt(shard);
            LogHubConsumer consuemr = getConsuemer(shardId);
            if(consuemr.startNewConsumeTask(this)) {
                runningTaskCounter.incrementAndGet();
            }
        }
        cleanConsumer(Arrays.asList(shardList));
        // 沒有一個任務已完成,阻塞等待一個完成
        if(runningTaskCounter.get() > 0) {
            if(newFinishedTaskCounter == 0) {
                waitAtLeastOnceTaskFinish();
            }
        }
    }
    
    /**
     * 重置消費者計數器
     */
    private void resetConsumeCounter() {
        newFinishedTaskCounter = 0;
    }

    /**
     * 阻塞等待至少一個任務執行完成
     *
     * @throws InterruptedException 中斷
     */
    private void waitAtLeastOnceTaskFinish() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (newFinishedTaskCounter == 0) {
                finishedTaskNotEmpty.await();
            }
        }
        finally {
            lock.unlock();
        }
    }

    /**
     * 通知任務完成(回調)
     *
     * @throws InterruptedException 中斷
     */
    private void notifyTaskFinished() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            runningTaskCounter.decrementAndGet();
            // 此處計數不可能小於0
            newFinishedTaskCounter += 1;
            finishedTaskNotEmpty.signal();
        }
        finally {
            lock.unlock();
        }
    }
    /**
     * 通知任務完成(回調)
     *
     * @throws InterruptedException 中斷
     */
    public void taskFinishCallback() throws InterruptedException {
        notifyTaskFinished();
    }
    
}

public class ConsumerWorker {

    private Future<?> future;
    
    @Resource
    private ExecutorService consumerService; 
    
    /**
     * 當查詢結果為時的等待延時, 每次查詢結果都會為空時,加大該延時, 直到達到設定的最大值為准
     */
    private Long baseEmptyQueryDelayMills = 200L;
    private Long emptyQueryDelayMills = baseEmptyQueryDelayMills;

    /**
     * 調置最大延時為1秒
     */
    private static final Long maxEmptyQueryDelayMills = 1000L;

    /**
     * 記數
     */
    private void encounterEmptyQueryDelay() {
        if(emptyQueryDelayMills < maxEmptyQueryDelayMills) {
            emptyQueryDelayMills += 100L;
        }
    }

    private void resetEmptyQueryDelay() {
        emptyQueryDelayMills = baseEmptyQueryDelayMills;
    }


    // 開啟一個消費者線程
    public boolean startNewConsumeTask(LogHubClientWork callback) {
        if(future==null || future.isCancelled() || future.isDone()) {
            //沒有任務或者任務已取消或已完成 提交任務
            future = consumerService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Integer dealCount = doBizData();
                        if(dealCount == 0) {
                            SleepUtil.millis(emptyQueryDelayMills);
                            encounterEmptyQueryDelay();
                        }
                        else {
                            resetEmptyQueryDelay();
                        }
                    }
                    finally {
                        try {
                            callback.taskFinishCallback();
                        }
                        catch (InterruptedException e) {
                            logger.error("處理完成通知失敗,中斷", e);
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            });
            return true;
        }
        return false;
    }
    
}

 


場景6. 抽象任務,后台運行處理任務多線程?
場景6解決:(業務相關類)

  最簡單也是最難的一種,根據具體業務類型做相應處理就好,主要考慮讀寫的安全性問題。

 

  如上幾個多線程的應用場景,是我在工作中切實用上的場景(所言非虛)。不過它們都有一個特點,即任務都是很獨立的,即基本上不用太關心線程安全問題,這也是我們編寫多線程代碼時盡量要做的事。當然很多場景共享數據是一定的,這時候就更要注意線程安全了。

  要做到線程安全也不是難事,比如足夠好的封裝,可以讓你把關注點鎖定在很小的范圍內。

  當然,為了線程安全,我們可能往往又會犧牲性能,這就看我們如何把握這些度了!互斥鎖是最容易使用的鎖,但是也是性能最差的鎖。分段鎖能夠解決鎖性能問題,但是又會給編寫帶來更大的困難。

 

  多線程,不止要會寫,還要會給自己填坑。

 

嘮叨: 去追天邊的那束光!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM