java 使用Queue在隊列中異步執行任務


先創建一個總的Handler(隊列統一處理接口),名字就叫做 QueueTaskHandler

public interface QueueTaskHandler {

    void processData();
}

然后寫一個隊列服務類,就不多做說明了,我的注釋已經寫的很清楚了

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

@Component
public class QueueGenerationService{

    // 日志監控
    private static final Logger log = LoggerFactory.getLogger(QueueGenerationService.class);
    // 根據業務與服務器性能自行配置 這里我配置的是最多50000個任務
    // LinkedBlockingQueue構造的時候若沒有指定大小,則默認大小為Integer.MAX_VALUE
    private final LinkedBlockingQueue<QueueTaskHandler> tasks = new LinkedBlockingQueue<QueueTaskHandler>(50000);
    // 類似於一個線程總管 保證所有的任務都在隊列之中
    private ExecutorService service = Executors.newSingleThreadExecutor();
    // 檢查服務是否運行
    private volatile boolean running = true;
    //線程狀態
    private Future<?> serviceThreadStatus = null;

    @PostConstruct
    public void init() {
    serviceThreadStatus = service.submit(new Thread(new Runnable() {
        @Override
        public void run() {
        while (running) {
            try {
            //開始一個任務
            QueueTaskHandler task = tasks.take();
            try {
                task.processData();
            } catch (Exception e) {
                log.error("任務處理發生錯誤", e);
            }
            } catch (InterruptedException e) {
            log.error("服務停止,退出", e);
            running = false;
            }
        }
        }
    }, "save data thread"));
    }

    public boolean addData(QueueTaskHandler dataHandler) {
    if (!running) {
        log.warn("service is stop");
        return false;
    }
    //offer 隊列已經滿了,無法再加入的情況下
    boolean success = tasks.offer(dataHandler);
    if (!success) {
        log.warn("添加任務到隊列失敗");
    }
    return success;
    }

  //判斷隊列是否有任務
    public boolean isEmpty() {
      return tasks.isEmpty();
    }
public boolean checkServiceRun() { return running && !service.isShutdown() && !serviceThreadStatus.isDone(); } public void activeService() { running = true; if (service.isShutdown()) { service = Executors.newSingleThreadExecutor(); init(); log.info("線程池關閉,重新初始化線程池及任務"); } if (serviceThreadStatus.isDone()) { init(); log.info("線程池任務結束,重新初始化任務"); } } @PreDestroy public void destory() { running = false; service.shutdownNow(); } }

接下來就可以開始寫你的業務Handler了

public class TestServiceHandler implements QueueTaskHandler {

    // ******* start 這一段並不是必要的,這是示范一個傳值的方式
    private String name;

    private Integer age;

    public TestServiceHandler(String name) {
    this.name = name;
    }

    public TestServiceHandler(Integer age) {
    this.age = age;
    }

    public TestServiceHandler(String name, Integer age) {
    this.name = name;
    this.age = age;
    }

    // ****** end

    // 這里也就是我們實現QueueTaskHandler的處理接口
    @Override
    public void processData() {
    // 可以去做你想做的業務了
    // 這里需要引用spring的service的話,我寫了一個工具類,下面會貼出來
    // ItestService testService = SpringUtils.getBean(ItestService.class);
    System.out.println("name > " + name + "," + "age > " + age);
    }

}

那么我們來在service中添加一個任務

    // 這里注入隊列服務
@Autowired
private QueueGenerationService queueGenerationService;

  // 在方法中調用與傳參的方式
  queueGenerationService.addData(new TestServiceHandler("小明",5));
 

整個過程就結束了,然后在你的業務Handler中如果需要使用其他的bean比如service,那么請試試我寫的這個工具類

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class SpringUtils implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    /**
     * @return
     * @Description 獲取applicationContext
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringUtils.applicationContext == null) {
            SpringUtils.applicationContext = applicationContext;
        }
    }

    /**
     * @param name
     * @return
     * @Description 通過name獲取 Bean.
     */
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * @param clazz
     * @return
     * @Description 通過class獲取Bean.
     */
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * @param name
     * @param clazz
     * @return
     * @Description 通過name, 以及Clazz返回指定的Bean
     */
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}

 如果大家有什么不解,或意見,歡迎在下方留言,樓主看到就會回復的,謝謝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM