Java使用多線程發送消息


  在后台管理用戶信息的時候,經常會用到批量發送提醒消息,首先想到的有:

     (1)、循環發送列表,逐條發送。優點是:簡單,如果發送列表很少,而且沒有什么耗時的操作,是比較好的一種選擇,缺點是:針對大批量的發送列表,不可取,耗時,程序會出現嚴重的阻塞問題。

     (2)、使用隊列(BlockingQueue),開啟多個線程,分為三個部分。一部分負責處理將發送列表放入隊列;一部分負責從隊列中讀取並發送消息;第三部分負責監視隊列是否為空及后續的操作。

     (3)、以下說到的這種模式,使用Future、Callable來返回發送結果,覺得是一種比較好的方式,很簡單代碼也很詳細,就不介紹了。

代碼如下:

public class PublishMsgTest {
    //創建固定大小為100 的線程池
    private static ExecutorService service = Executors.newFixedThreadPool(100);
    
    //發送消息的業務邏輯方法
    public int sendMsg(List<Integer> receivers,String content){
        long begin = System.nanoTime();
        AtomicInteger ai = new AtomicInteger(0);
        List<Future<Integer>> list = new ArrayList<>();
        //循環發送消息
        for(int i=0;i<receivers.size();i++){
            Integer receiver = receivers.get(i);
            //使用Future,Callable實現發送消息后返回發送結果
            Future<Integer> future = service.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    //調用相對比較耗時的發送消息接口
                    Thread.sleep(200);
                    //發送消息
                    int resultStatus = sendMsg(receiver,content);
                    System.out.println("接收者【"+receiver+"】,發送結果【"+resultStatus+"】");
                    return resultStatus;
                }
            });
            list.add(future);
        }
        System.out.println("-----------------------"+(System.nanoTime()-begin)/1000_000d+"-----------------------");
        //循環接收發送結果,相當於一個使線程同步的過程,這個過程是比較耗時的
        for(int i=0;i<list.size();i++){
            try {
                int resultStatus = list.get(i).get();
                if(resultStatus == 0){//發送成功
                    ai.incrementAndGet();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
        System.out.println("發送消息結束,耗時:"+(System.nanoTime()-begin)/1000_000d);
        return ai.get();
    }
    
    public static void main(String[] args){
        PublishMsgTest pmt = new PublishMsgTest();
        //待接收人
        List<Integer> receivers = new ArrayList<Integer>();
        for(int i=0;i<1000;i++){
            receivers.add(i);
        }
        String content = "群發消息_測試代碼";
        int successCount = pmt.sendMsg(receivers, content);
        System.out.println("共有【"+receivers.size()+"】接收者,發送成功【"+successCount+"】");
    }

    //完成發送消息
    private int sendMsg(Integer receiver, String content) {
        if(receiver%2 == 0){//模擬被2整除,即為發送成功
            return 0;
        }
        return 1;
    }

 

以上代碼的執行結果:

-----------------------14.786889-----------------------
接收者【2】,發送結果【0】
接收者【3】,發送結果【1】
接收者【4】,發送結果【0】
接收者【0】,發送結果【0】
接收者【6】,發送結果【0】
接收者【7】,發送結果【1】
接收者【1】,發送結果【1】
接收者【5】,發送結果【1】
接收者【10】,發送結果【0】
.
.
.
.
接收者【994】,發送結果【0】
接收者【993】,發送結果【1】
接收者【992】,發送結果【0】
接收者【996】,發送結果【0】
接收者【999】,發送結果【1】
接收者【998】,發送結果【0】
接收者【997】,發送結果【1】
發送消息結束,耗時:2033.053433
共有【1000】接收者,發送成功【500】

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM