關於Java串行、並行執行——使用Callable多線程


一.使用Callable多線程:

通過Callable接口實現多線程

實現Callable重寫call方法;

實現Callable和實現Runnable類似,但是功能更強大,具體表現在:

a.可以在任務結束后提供一個返回值,Runnable不行;

b.call方法可以拋出異常,Runnable的run方法不行;

c.可以通過運行Callable得到的Fulture對象監聽目標線程調用call方法的結果,得到返回值,(fulture.get(),調用后會阻塞,直到獲取到返回值);

 

1Callable接口介紹:

(1)java.util.concurrent.Callable是一個泛型接口,只有一個call()方法;

(2)call()方法拋出異常Exception異常,且返回一個指定的泛型類對象;

 

2、Callable接口實現多線程的應用場景

(1)當父線程想要獲取子線程的運行結果時;

 

3、使用Callable接口實現多線程的步驟

(1)第一步:創建Callable子類的實例化對象;

(2)第二步:創建FutureTask對象,並將Callable對象傳入FutureTask的構造方法中(注意:FutureTask實現了Runnable接口和Future接口);

  (3)第三步:實例化Thread對象,並在構造方法中傳入FurureTask對象;

  (4)第四步:啟動線程;

 

二.需求:

做一個報表功能:在一個方法中查詢多個數據庫表的結果,然后匯總返回;

由於單獨查詢一個數據庫表速度較慢(大字段查詢),此時如果串行查詢多個表的話效率會非常低,所以需要多線程同時查詢數據庫,等全部查詢完畢后再匯總!

DAO層:

package com.moerlong.yj.mapper;

import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Repository;

@Repository
public interface BaseMapper {

    @Select("select hf_industry_record from msd_hf_industry_person_srcdata where card_id = #{cardId}")
    public String selectIndustryData(String cardId);

    @Select("select hf_judicial_record from msd_hf_judicial_person_srcdata where card_id = #{cardId}")
    public String selectJudicialData(String cardId);

    @Select("select td_record from msd_td_preloan_srcdata where card_id = #{cardId}")
    public String selectTdData(String cardId);
}

Service層:

串行執行:

package com.moerlong.yj.Service;

import com.moerlong.yj.mapper.BaseMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class TestNoCallable {

    @Autowired
    private BaseMapper baseMapper;

    public String test1(String cardId) throws Exception{
        long start = System.currentTimeMillis();

        //三個串行查詢
        String industryData = baseMapper.selectIndustryData(cardId);
        String judicialData = baseMapper.selectJudicialData(cardId);
        String tdData = baseMapper.selectTdData(cardId);

        Thread.sleep(3000);     //此處模擬每個查詢添加1s耗時

        String result = industryData + judicialData + tdData;

        long end = System.currentTimeMillis();
        System.out.println("串行執行:" + (end-start));

        return result;
    }
}

並行執行:

package com.moerlong.yj.Service;

import com.moerlong.yj.mapper.BaseMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.lang.reflect.Method;
import java.util.concurrent.*;


@Service
public class TestCallable {
    @Autowired
    private BaseMapper baseMapper;

    public String test2(String cardId) throws Exception{
        // 三個線程的線程池,核心線程=最大線程,沒有臨時線程,阻塞隊列無界
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        long start = System.currentTimeMillis();

        // 開啟線程執行
        // 注意,此處Future對象接收線程執行結果不會阻塞,只有future.get()時候才會阻塞(直到線程執行完返回結果)
        Future future1 = executorService.submit(new SelectTask<>(this, "selectIndustryData", new Object[]{cardId}));

        Future future2 = executorService.submit(new SelectTask<>(this, "selectJudicialData", new Object[]{cardId}));

        Future future3 = executorService.submit(new SelectTask<>(this, "selectTdData", new Object[]{cardId}));

        //此處用循環保證三個線程執行完畢,再去拼接三個結果
        do{
            System.out.println("多任務同時執行中...");
        }while (!(future1.isDone() && future2.isDone() && future3.isDone()));

        String result = (String)future1.get() + future2.get() + future3.get();

        long end = System.currentTimeMillis();
        System.out.println("並行執行:" + (end-start));

        return result;

    }

    //下面是三個真正執行任務(查數據庫)的方法
    public String selectIndustryData(String cardId) throws Exception{
        String result = baseMapper.selectIndustryData(cardId);
        Thread.sleep(1000);    //模擬添加1s耗時
        return result;
    }

    public String selectJudicialData(String cardId) throws Exception{
        String result = baseMapper.selectJudicialData(cardId);
        Thread.sleep(1000);
        return result;
    }

    public String selectTdData(String cardId) throws Exception{
        String result = baseMapper.selectTdData(cardId);
        Thread.sleep(1000);
        return result;
    }

    //任務線程類
    class SelectTask<T> implements Callable<T> {

        private Object object;
        private Object[] args;
        private String methodName;

        public SelectTask(Object object, String methodName, Object[] args) {
            this.object = object;
            this.args = args;
            this.methodName = methodName;
        }

        @Override
        public T call() throws Exception {
            Method method = object.getClass().getMethod(methodName,String.class);   //此處應用反射機制,String.class是根據實際方法參數設置的
            return (T) method.invoke(object, args);
        }
    }
}

控制層:

package com.moerlong.yj.controller;


import com.moerlong.yj.Service.TestCallable;
import com.moerlong.yj.Service.TestNoCallable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class TestController {
@Autowired
private TestNoCallable testNoCallable; @Autowired private TestCallable testCallable; @RequestMapping(value = "/test1/{cardId}") public String test1(@PathVariable String cardId) throws Exception{ String result = testNoCallable.test1(cardId); return result; } @RequestMapping(value = "/test2/{cardId}") public String test2(@PathVariable String cardId) throws Exception{ String result = testCallable.test2(cardId); return result; } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM