曹工雜談:我們的應用,啟動就要去其他服務拉數據,那其他服務掛了,我們就起不來了?


曹工雜談:我們的應用,啟動就要去其他服務拉數據,那其他服務掛了,我們就起不來了?

前言

在大家的項目中,想必都有那種,啟動時候要去其他服務拉一些數據的情況,如果我們啟動時,其他服務沒啟動,按豈不是就起不來了嗎,如果這段拉數據的代碼,並不是核心業務,那你這就有點說不過去了:不能因為對方沒啟動,我們也不能啟動吧?

經過一些思考后,我覺得可以這樣,啟動的時候:

  • 啟動一個定時的線程池,讓它去執行拉數據的任務,如果任務執行失敗,會過一段時間后再次執行
  • 我們希望,一旦某一次執行任務,成功后,就不要再去拉數據了,浪費網絡流量和cpu

我這邊可以大概就大家演示下。

示例代碼

服務端

隨便寫了個spring boot服務端,監聽本機8082端口。模擬第三方服務

@RestController
@Slf4j
public class BusinessController {


    @GetMapping("/")
    public String test() {
        return "success";
    }
}

@SpringBootApplication
@Slf4j
public class WebDemoApplicationServer {

	public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(WebDemoApplicationServer.class, args);
    }

}

客戶端

客戶端程序,依賴第三方服務,啟動時,要去上面的服務端拉數據。

代碼和上面差不多,唯一是在啟動時,會執行以下邏輯:

@Component
public class InitRunner implements  CommandLineRunner{
   private static final Logger log = LoggerFactory.getLogger(InitRunner.class);

    @Autowired
    private RestTemplate restTemplate;

    @Override
    public void run(String... args) throws Exception {
        ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
        String s = entity.toString();
        log.info("get data:{}",s);
    }
}

在上面的服務沒啟動的時候,這個客戶端是起不來的。

怎么解決呢,很簡單。

方案1

public class InitRunnerV2 implements CommandLineRunner {

    @Autowired
    private RestTemplate restTemplate;
	
  	// 1
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
            new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys"));

    @Override
    public void run(String... args)  {
      	//2 
        TestTask task = new TestTask(restTemplate);
      	//3 
        ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                        0, 10, TimeUnit.SECONDS);
      	// 4
        task.setScheduledFuture(scheduledFuture);
    }


}
  • 1處,new了一個線程池,ScheduledThreadPoolExecutor類型,可周期執行某個任務

  • 2處,new了一個任務,這個任務會執行我們的拉數據邏輯。

    這個任務的代碼如下:

    @Slf4j
    public class TestTask implements Runnable{
        private RestTemplate restTemplate;
    
        private volatile ScheduledFuture<?> scheduledFuture;
    
        public TestTask(RestTemplate restTemplate) {
            this.restTemplate = restTemplate;
        }
    
        ...
    
        public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
            this.scheduledFuture = scheduledFuture;
        }
    }
    

    其實很簡單,就是定義了2個字段,一個是RestTemplate,請求數據時要用;另一個是ScheduledFuture<?>類型,這個字段在上面的InitRunnerV2代碼的第三處被賦值。

  • 3處,讓這個任務循環執行,每10s一次。

  • 4處,給task的 ScheduledFuture 賦值,注意的是,在task中,這個字段我們定義為volatile,保證線程可見。

下面是任務代碼的剖析:

@Override
    public void run() {
        try {
            ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
            String s = entity.toString();
            log.info("get data:{}",s);
        } catch (Exception e) {
//            log.error("e:{}",e);
            log.error("error");
            return;
        }

        /**
         * 1 有可能任務執行太快,future還沒被賦值
         */
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }

    }

唯一有什么要說的,就是1處,如果成功了,我們就會調用scheduledFuture.cancel(true);,這樣,這個scheduled 任務就不會繼續執行了,也就達到了我們的目的,經濟實惠。

到此,代碼基本就這樣了,詳細代碼見:

https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/spring-boot-scheduler-future-demo-parent

不成熟方案2

因為上面的方案挺簡單實用,但感覺沒啥干貨,於是我想着是否可以自己來實現一個定制的線程池,把這些事情給自動化了。

希望實現的最終效果如下,給future增加一個回調,需要在任務執行成功時,該回調自動被調用:

public class InitRunnerV3 implements CommandLineRunner {

    @Autowired
    private RestTemplate restTemplate;

    CustomScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
            new CustomScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys"));

    @Override
    public void run(String... args)  {
        // 1
        TestTaskV3 task = new TestTaskV3(restTemplate);
        // 2
        CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                        0, 10, TimeUnit.SECONDS);
        // 3
        scheduledFuture.setCustomFutureCallBack(new CustomFutureCallBack() {

            @Override
            public void onSuccess(CustomScheduledFuture customScheduledFuture) {
                log.info("onSuccess");
                // 4
                customScheduledFuture.cancel(true);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("e:{}",throwable);
            }
        });
    }
  • 1處,執行任務,任務內部如下,去除了設置future的邏輯,和取消的邏輯

    
    @Slf4j
    public class TestTaskV3 implements Runnable{
        private RestTemplate restTemplate;
    
        public TestTaskV3(RestTemplate restTemplate) {
            this.restTemplate = restTemplate;
        }
      
        @Override
        public void run() {
            try {
                ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
                String s = entity.toString();
                log.info("get data:{}",s);
            } catch (Exception e) {
    //            log.error("e:{}",e);
                log.error("error");
                throw e;
            }
      
        }
    
    }
    
  • 2處,循環執行任務,這里的scheduled線程池,是我們自定義的,回頭再說;獲取其返回的future

  • 3處,給future增加回調,在回調中,如果成功,則取消該任務。

                @Override
                public void onSuccess(CustomScheduledFuture customScheduledFuture) {
                    log.info("onSuccess");
                    // 4
                    customScheduledFuture.cancel(true);
                }
    

尋找擴展點

這里,afterExecute是個空實現,就是留給子線程池擴展用的:

    protected void afterExecute(Runnable r, Throwable t) { }

那我們可以考慮下,要怎么才能實現我們的目標呢,我們要在這個方法內,通過傳進來的Runnable r,獲取到下面這個future才能實現目的:

        CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                        0, 10, TimeUnit.SECONDS);

獲取到future,就能拿到在future上設置的callback對象,就能調用callback,所以,現在問題是,要在傳進來的Runnable中,獲取到scheduledFuture

所以,我們就得包裝一下,傳進來的runnable,我們定義了如下的Runnable:

@Data
public class CustomDecoratedRunnable implements Runnable {
    Runnable runnable;

    CustomScheduledFuture customScheduledFuture;

    public CustomDecoratedRunnable(Runnable runnable,CustomScheduledFuture customScheduledFuture) {
        this.runnable = runnable;
        this.customScheduledFuture = customScheduledFuture;
    }

    @Override
    public void run() {
        this.runnable.run();
    }


}

定制線程池

我們具體看看,我們定制的線程池對象,我們的線程池,直接繼承了ScheduledThreadPoolExecutor

public class CustomScheduledThreadPoolExecutor<V> extends ScheduledThreadPoolExecutor {

    public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }
  
  	...
}

scheduleAtFixedRate方法,我們進行了重寫:

@Override
    public CustomScheduledFuture<V> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        /**
         * 1 
         */
        CustomScheduledFuture customScheduledFuture = new CustomScheduledFuture();
		// 2 將future設置到task中
        CustomDecoratedRunnable customDecoratedRunnable = new CustomDecoratedRunnable(command,customScheduledFuture);
       // 3
        ScheduledFuture<?> scheduledFuture = super.scheduleAtFixedRate(customDecoratedRunnable,
                initialDelay, period, unit);

        /**
         * 4 將返回的future,設置到我們包裝過的future
         */
        customScheduledFuture.setScheduledFuture((RunnableScheduledFuture) scheduledFuture);

        return customScheduledFuture;
    }
  • 1處,新建一個自定義的future

  • 2處,將自定義的future,設置到上面說的task中

  • 3處,把包裝過的task,丟給線程池

  • 4處,返回一個定制的future,這個future,包裝了原有的future,同時,支持設置callback

    public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V> {
        /**
         * 其實是下面這種類型:
         * {@link java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
         *
         */
        RunnableScheduledFuture<V> scheduledFuture;
      
    	// 設置callback時,賦值
        CustomFutureCallBack customFutureCallBack;
    
        Runnable runnable;
    }
    

丟給定制線程池的task

本來,我以為,丟給線程池什么Runnable對象,在afterExecute就能拿到什么樣的Runnable對象,結果:

發現,傳進來的,已經被包裝過了,應該是為了支持周期執行。

所以,沒辦法,看起來路被堵死了,通過這個傳進來的Runnable,也拿不到我們原始的Runnable。

后邊找了半天,找到下面這個點:

#java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        // 1
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
  • 1處,會調用decorateTask來包裝task,默認實現,就是如下:

        protected <V> RunnableScheduledFuture<V> decorateTask(
            Runnable runnable, RunnableScheduledFuture<V> task) {
            return task;
        }
    

    這里的task,就是前面那個代碼里的 ScheduledFutureTask<Void> sft:

            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            // 1
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    

所以,我們得想辦法重載這個方法:

    @Override
    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
        CustomScheduledFuture<V> future = new CustomScheduledFuture<>();
        future.setRunnable(runnable);
        future.setScheduledFuture(task);
        return future;
    }

這里,利用CustomScheduledFuture,封裝了task和runnable兩個對象。

同時,我們自定義的這個CustomScheduledFuture,也是實現了這個方法的返回值,指定的接口:

@Data
public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V> 


目前為止,經過包裝后,在afterExecute處,拿到的Runnable如下:

afterExecute的邏輯,調用回調

 @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        CustomScheduledFuture future;
        CustomDecoratedRunnable runnable = null;
        if (r instanceof CustomScheduledFuture) {
            future = (CustomScheduledFuture) r;
            // 1
            runnable = (CustomDecoratedRunnable) future.getRunnable();
        }
        // 2
        CustomScheduledFuture customScheduledFuture = runnable.getCustomScheduledFuture();
        // 3
        CustomFutureCallBack customFutureCallBack = customScheduledFuture.getCustomFutureCallBack();
        if (customFutureCallBack != null) {
            if (t != null) {
                customFutureCallBack.onException(t);
            } else {
                // 4
                customFutureCallBack.onSuccess(customScheduledFuture);
            }
        }

    }
  • 1處,獲取runnable
  • 2處,根據runnable,獲取我們的future
  • 3處,通過future,獲取回調
  • 4處,調用回調

效果展示

2020-04-10 09:45:28.068  INFO 14456 --- [           main] No active profile set, falling back to default profiles: default
2020-04-10 09:45:28.822  INFO 14456 --- [           main] Started WebDemoApplication in 1.153 seconds (JVM running for 1.805)
2020-04-10 09:45:36.933 ERROR 14456 --- [init-data-from-third-sys-1-thread-1] error
2020-04-10 09:48:48.975  INFO 14456 --- [init-data-from-third-sys-1-thread-1] onSuccess

可以看到,任務執行失敗了,但為啥會調用onSuccess呢;另外,大家可以看到,都是在線程池的線程中執行的。

為啥會error了,還執行success呢,我發現,即使我在task中拋出了異常,但是上層沒捕獲。

我猜測,是因為:

public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

這里沒有拋出異常,所以,即使實現的runnable中拋了,上層也不管。

具體還要驗證。

注意點

另一個點是,執行失敗了,等了10s,並沒有再次執行,猜測是我的定制task,導致了周期執行的問題。這個待驗證和解決。

但,一個簡單的回調,我們已經實現了。

總結

大家使用方案1 就可以了;后面的方案,是折騰着玩的。希望對大家有幫助。
全部代碼都在:

https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/spring-boot-scheduler-future-demo-parent


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM