Java 異步處理簡單實踐


同步與異步

       通常同步意味着一個任務的某個處理過程會對多個線程在用串行化處理,而異步則意味着某個處理過程可以允許多個線程同時處理。

       異步通常代表着更好的性能,因為它很大程度上依賴於緩沖,是典型的使用空間換時間的做法,例如在計算機當中,高速緩存作為cpu和磁盤io之間的緩沖地帶協調cpu高速計算能力和磁盤的低速讀寫能力。

volatile

       應用場景:檢查一個應用執行關閉或中斷狀態。因為此關鍵字拒絕了虛擬對一個變量多次賦值時的優化從而保證了虛擬機一定會檢查被該關鍵字修飾的變量的狀態變化

CountDownLatch

       應用場景:控制在一組線程操作執行完成之前當前線程一直處於等待。例如在主線程中執行await()方法阻塞主線程,在工作線程執行完邏輯后執行countDown()方法

本文示例場景:

       1,從控制台發送消息到消息服務器(由一個隊列模擬)。

       2,將消息隊列寫入到文件(對寫文件的操作設置延時以模擬性能瓶頸)。

       3,消息服務器作為控制台和文件寫入之間的緩沖區。

 

示例代碼:

      注:往消息隊列添加消息可以通過for循環一次性加入,本文為了便於觀察文件和隊列的變化而采用了控制台輸入,實際寫一行文件記錄速度應該高於手速,所以本文示例中增加了線程sleep時間。

package org.wit.ff.ch2;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 
 * <pre>
 * 簡單異步處理示例.
 * </pre>
 * 
 * @author F.Fang
 * @version $Id: AsyncHandler.java, v 0.1 2014年10月23日 下午11:37:54 F.Fang Exp $
 */
public class AsyncHandler {

    /**
     * 控制資源釋放.
     */
    private CountDownLatch latch;

    /**
     * 處理完成標識.
     */
    private volatile boolean handleFinish;

    /**
     * 消息寫入本地文件完成.
     */
    private volatile boolean sendFinish;

    /**
     * 阻塞隊列.
     */
    private BlockingQueue<String> queue;

    private BufferedWriter bw;

    public AsyncHandler(CountDownLatch latch) {
        this.latch = latch;
        /**
         * 使用鏈表實現.
         */
        queue = new LinkedBlockingQueue<String>();
        File file = new File("E:/hello.txt");
        try {
            bw = new BufferedWriter(new FileWriter(file));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void handle() {
        // 模擬性能瓶頸的執行過程,3s處理一條消息.
        new Thread() {
            public void run() {
                while (!handleFinish) {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e1) {
                        // 不做處理.
                    }
                    String s = queue.peek();
                    if (s != null) {
                        queue.poll();
                        try {
                            bw.write(s);
                            bw.newLine();
                        } catch (IOException e) {
                        }
                    }
                    // 若隊列為空並且消息發送完成.
                    if (queue.isEmpty() && sendFinish) {
                        // 計數器1->0
                        latch.countDown();
                        // 讓處理過程結束.
                        handleFinish = true;
                        break;
                    }
                }
            }

        }.start();

    }

    /**
     * 
     * <pre>
     * 給出消息發送完成的標識.
     * </pre>
     *
     */
    public void sendFinish() {
        sendFinish = true;
    }

    /**
     * 
     * <pre>
     * 資源釋放.
     * </pre>
     *
     */
    public void release() {
        System.out.println("release!");
        if (bw != null) {
            try {
                bw.close();
            } catch (IOException e) {
                // TODO 打印日志.
            }
        }
        //其實使用queue = null就夠了.
        if (queue != null) {
            queue.clear();
            queue = null;
        }
    }

    /**
     * 
     * <pre>
     * 往隊列發送消息.
     * </pre>
     *
     * @param text
     */
    public void sendMsg(String text) {
        if (text != null && !text.isEmpty()) {
            queue.add(text);
        }
    }

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(1);
        AsyncHandler handler = new AsyncHandler(latch);
        handler.handle();

        // 做一次檢查.
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String text = scanner.next();
            // 若用戶選擇退出.
            if ("exit".equals(text)) {
                // 表示消息已經發送完成.
                handler.sendFinish();
                break;
            }
            handler.sendMsg(text);
        }

        try {
            // 阻塞主線程等待消息寫入到本地文件完成.
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 釋放資源 文件流,隊列.
        handler.release();
        // 關閉控制台輸入.
        scanner.close();
    }

}

 

   

       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM