多線程編程學習五(線程池的創建)


一、概述

在開發過程中,線程池可以帶來如下好處:

  1. 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
  2. 提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
  3. 提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。

New Thread的弊端如下:
       a、每次New Thread新建對象性能差。
       b、線程缺乏統一的管理,可能無限制的新建線程,相互之間競爭,極可能占用過多的系統資源導致死機 或者 OOM。
       c、缺乏更多功能,如定時執行、定期執行、線程中斷。

Java提供的四種線程池的好處在於:
       a、重用存在的線程,減少對象創建、消亡的開銷,性能佳。
       b、可有效控制最大並發線程數、提供系統資源的使用率,同時避免過多資源競爭,避免堵塞。
       c、提供定時執行、定期執行、單線程、並發數控制等功能。

二、Executors 創建線程池

Java通過Executors提供四種線程池,分別為:

newCachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待,表示同一時刻只能有這么大的並發數
newScheduledThreadPool 創建一個定時線程池,支持定時及周期性任務執行。
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

三、ThreadPoolExecutor 創建線程池

線程池不建議使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。 說明:Executors各個方法的弊端:
      1、 newFixedThreadPool 和 newSingleThreadExecutor:
       主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。
      2、newCachedThreadPool 和 newScheduledThreadPool:
       主要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM。

這里介紹三種創建線程池的方式:

Example 1:

    //org.apache.commons.lang3.concurrent.BasicThreadFactory
    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

Example 2:

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

    //Common Thread Pool
    ExecutorService pool = new ThreadPoolExecutor(5, 200,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    pool.execute(()-> System.out.println(Thread.currentThread().getName()));
    pool.shutdown();//gracefully shutdown

Example 3:

    <bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="100" />
        <property name="queueCapacity" value="2000" />
        <property name="threadFactory" value= threadFactory />
        <property name="rejectedExecutionHandler">
            <ref local="rejectedExecutionHandler" />
        </property>
    </bean>
    //in code
    userThreadPool.execute(thread);

 

tips:ThreadPoolExecutor詳解可以參考:https://www.cnblogs.com/jmcui/p/11552583.html

四、自建線程池

    我們要建一個簡單的線程池,它預先創建了若干數量的線程,並且不能由用戶直接對線程的創建進行控制,在這個前提下重復使用固定或較為固定數目的線程來完成任務的執行。這樣做的好處是,一方面,消除了頻繁創建和消亡線程的系統資源開銷,另一方面,面對過量任務的提交能夠平緩的劣化。

public interface ThreadPool<Job extends Runnable> {

    /**
     * 執行一個Job,這個Job需要實現Runnable
     *
     * @param job
     */
    void execute(Job job);

    /**
     * 關閉線程池
     */
    void shutdown();

    /**
     * 增加工作者線程
     *
     * @param num
     */
    void addWorkers(int num);

    /**
     * 減少工作者線程
     *
     * @param num
     */
    void removeWorker(int num);

    /**
     * 得到正在等待執行的任務數量
     *
     * @return
     */
    int getJobSize();
}
ThreadPool
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    /**
     * 線程池最大限制數、默認的數量、最小的數量
     */
    private static final Integer MAX_WORKER_NUMBERS = 10;
    private static final Integer DEFAULT_WORKER_NUMBERS = 5;
    private static final Integer MIN_WORKER_NUMBERS = 1;
    /**
     * 這是一個待工作列表,將會向里面插入工作
     */
    private final LinkedList<Job> jobs = new LinkedList<>();
    /**
     * 工作者列表(固定數目的線程,不斷去執行  jobs 中的任務)
     */
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    /**
     * 工作者線程的數量
     */
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    /**
     * 線程編號生成
     */
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool() {
        initializeWokers(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int num) {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWokers(workerNum);
    }

    @Override
    public void execute(Job job) {
        if (job != null) {
            // 添加一個工作,然后進行通知
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized (jobs) {
            // 限制新增的Worker數量不能超過最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWokers(num);
            this.workerNum += num;
        }
    }

    @Override
    public void removeWorker(int num) {
        synchronized (jobs) {
            if (num >= this.workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
            //按照給定的數量停止Worker
            int count = 0;
            while (count < num) {
                // 每次都移除第一個線程
                Worker worker = workers.get(0);
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }

    /**
     * 初始化線程工作者
     *
     * @param num
     */
    private void initializeWokers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    /**
     * 工作者,負責消費任務
     */
    class Worker implements Runnable {
        /**
         * 是否工作
         */
        private volatile boolean running = true;

        @Override
        public void run() {
            while (running) {
                Job job;
                synchronized (jobs) {
                    // 如果工作者列表是空的,那么就wait
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException ex) {
                            // 感知到外部對 WorkerThread 的中斷操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //取出一個Job
                    job = jobs.removeFirst();
                }
                if (job != null) {
                    try {
                        job.run();
                    } catch (Exception ex) {
                        // 忽略Job執行中的Exception
                    }
                }
            }
        }

        public void shutdown() {
            running = false;
        }
    }
}
DefaultThreadPool

    可以看到,線程池的本質就是使用了一個線程安全的工作隊列(workers)連接工作者線程和客戶端線程,客戶端線程將任務(job)放入工作隊列后便返回,而工作者線程則不斷地從工作隊列上取出工作並執行。當工作隊列為空時,所有的工作者線程均等待在工作隊列上,當有客戶端提交了一個任務之后會通知任意一個工作者線程,隨着大量的任務被提交,更多的工作者線程會被喚醒。

    我們利用自建的線程池來構造一個簡單的 Web 服務器,這個 Web 服務器用來處理 HTTP 請求,目前只能處理簡單的文本和 JPG 圖片內容。這個 Web 服務器使用 main 線程不斷地接受客戶端 Socket 的連接,將連接以及請求提交給線程池處理,這樣使得 Web 服務器能夠同時處理多個客戶端請求。

public class SimpleHttpServer {

    /**
     * 處理HttpRequest的線程池
     */
    static ThreadPool<HttpRequestHandler> THREAD_POOL = new DefaultThreadPool<>(1);
    /**
     * SimpleHttpServer的根路徑(可以理解成 Tomcat 的 Root 目錄)
     */
    static String basePath;
    static ServerSocket serverSocket;
    /**
     * 服務監聽端口
     */
    static int port = 8080;

    public static void setPort(int port) {
        if (port > 0) {
            SimpleHttpServer.port = port;
        }
    }

    public static void setBasePath(String basePath) {
        if (basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()) {
            SimpleHttpServer.basePath = basePath;
        }
    }

    /**
     * 啟動SimpleHttpServer
     *
     * @throws Exception
     */
    public static void start() throws Exception {
        serverSocket = new ServerSocket(port);
        Socket socket;
        while ((socket = serverSocket.accept()) != null) {
            // 接收一個客戶端Socket,生成一個HttpRequestHandler,放入線程池執行
            THREAD_POOL.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }


    static class HttpRequestHandler implements Runnable {

        private Socket socket;

        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            // socket 輸入
            BufferedReader reader = null;
            // socket 輸出
            PrintWriter out = null;
            BufferedReader br = null;
            InputStream in = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                // 由相對路徑計算出絕對路徑
                String filePath = basePath + header.split("\\s+")[1];
                out = new PrintWriter(socket.getOutputStream());
                // 如果請求資源的后綴為jpg或者ico,則讀取資源並輸出
                if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                    in = new FileInputStream(filePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int i;
                    while ((i = in.read()) != -1) {
                        baos.write(i);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: image/jpeg");
                    out.println("Content-Length: " + array.length);
                    out.println("");
                    socket.getOutputStream().write(array, 0, array.length);
                } else {
                    br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
                    out = new PrintWriter(socket.getOutputStream());
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println("");
                    String line = null;
                    while ((line = br.readLine()) != null) {
                        out.println(line);
                    }
                }
                out.flush();
            } catch (Exception ex) {
                out.println("HTTP/1.1 500");
                out.println("");
                out.flush();
            } finally {
                close(br, in, reader, out, socket);
            }
        }
    }

    /**
     * 關閉流或者Socket
     *
     * @param closeables
     */
    private static void close(Closeable... closeables) {
        if (closeables != null) {
            for (Closeable closeable : closeables) {
                try {
                    closeable.close();
                } catch (Exception ex) {
                }
            }
        }
    }


    public static void main(String[] args) throws Exception {
        SimpleHttpServer.setPort(-1);
        SimpleHttpServer.setBasePath("D:\\");
        SimpleHttpServer.start();
    }
}
SimpleHttpServer

    以上案例參考自《Java 並發編程的藝術》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM