Goolge-Guava Concurrent中的Service


最近在學習了下Google的Guava包,發現這真是一個好東西啊。。由於平時也會寫一些基於多線程的東西,所以特意了解了下這個Service框架。這里Guava包里的Service接口用於封裝一個服務對象的運行狀態、包括start和stop等方法。例如web服務器,RPC服務器、計時器等可以實現這個接口。對此類服務的狀態管理並不輕松、需要對服務的開啟/關閉進行妥善管理、特別是在多線程環境下尤為復雜。Guava包提供了一些基礎類幫助你管理復雜的狀態轉換邏輯和同步細節。

概述

一個服務正常生命周期有:

  • Service.State.NEW
  • Service.State.STARTING
  • Service.State.RUNNING
  • Service.State.STOPPING
  • Service.State.TERMINATED
    服務一旦被停止就無法再重新啟動了。如果服務在starting、running、stopping狀態出現問題、會進入Service.State.FAILED.狀態。調用 startAsync()方法可以異步開啟一個服務,同時返回this對象形成方法調用鏈。注意:只有在當前服務的狀態是NEW時才能調用startAsync()方法,因此最好在應用中有一個統一的地方初始化相關服務。停止一個服務也是類似的、使用異步方法stopAsync() 。但是不像startAsync(),多次調用這個方法是安全的。這是為了方便處理關閉服務時候的鎖競爭問題。

Service也提供了一些方法用於等待服務狀態轉換的完成:通過 addListener()方法異步添加監聽器。此方法允許你添加一個 Service.Listener 、它會在每次服務狀態轉換的時候被調用。注意:最好在服務啟動之前添加Listener(這時的狀態是NEW)、否則之前已發生的狀態轉換事件是無法在新添加的Listener上被重新觸發的。

同步使用awaitRunning()。這個方法不能被打斷、不強制捕獲異常、一旦服務啟動就會返回。如果服務沒有成功啟動,會拋出IllegalStateException異常。同樣的, awaitTerminated() 方法會等待服務達到終止狀態(TERMINATED 或者 FAILED)。兩個方法都有重載方法允許傳入超時時間。

Service 接口本身實現起來會比較復雜、且容易碰到一些捉摸不透的問題。因此我們不推薦直接實現這個接口。而是請繼承Guava包里已經封裝好的基礎抽象類。每個基礎類支持一種特定的線程模型。

Service接口的實現

AbstractIdleService

AbstractIdleService在我們服務處於running狀態時,不會做執行任何動作,我們僅僅只有在startup和shutdown的時候才執行一些動作,所以我們在實現這個方法時,只是簡單的實現startUp() 和 shutDown() 這兩個方法即可,在startUp方法中做一些比如初始化,注冊等操作,在shutDown中做一些清理操作等。舉個例子,也就是官網的例子:

protected void startUp() {
  servlets.add(new GcStatsServlet());
}
protected void shutDown() {}

我們在startUp()方法的時候,實例化了一個GcStatsServlet,當我們在運行的時候,會有現成的線程處理這個Servlet,所以在服務運行時就不需要做什么額外動作了。這個比較簡單,就不舉例子了,應該用的情況應該不會很多吧?。。。。

AbstractExecutionThreadService

AbstractExecutionThreadService在單個線程中執行startup, running, and shutdown,我們必須實現run()方法,同事在方法中要能響應停止服務的請求,比如在一個循環中:

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;

import java.util.concurrent.TimeUnit;

/**
 * User: hupeng
 * Date: 14-12-22
 * Time: 下午10:17
 */
public class AbstractExecutionThreadServiceTest extends AbstractExecutionThreadService {
    private volatile boolean running = true; //聲明一個狀態

    @Override
    protected void startUp() {
        ////在這里我們可以做一些初始化操作
    }

    @Override
    public void run() {
        while (running) {
            // do our work
            try {
                Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
                System.out.println("do our work.....");
            } catch (Exception e) {
                //處理異常,這里如果拋出異常,會使服務狀態變為failed同時導致任務終止。
            }
        }
    }

   @Override
    protected void triggerShutdown() {
        running = false; //這里我們改變狀態值,run方法中就能夠得到響應。=
        //可以做一些清理操作,也可以移到shutDown()方法中執行
    }

    @Override
    protected void shutDown() throws Exception {
        //可以關閉資源,關閉連接。。。
    }

    public static void main(String[] args) throws InterruptedException {
        AbstractExecutionThreadServiceTest service = new AbstractExecutionThreadServiceTest();

        service.addListener(new Listener() {
            @Override
            public void starting() {
                System.out.println("服務開始啟動.....");
            }

            @Override
            public void running() {
                System.out.println("服務開始運行");;
            }

            @Override
            public void stopping(State from) {
                System.out.println("服務關閉中");
            }

            @Override
            public void terminated(State from) {
                System.out.println("服務終止");
            }

            @Override
            public void failed(State from, Throwable failure) {
                System.out.println("失敗,cause:" + failure.getCause());
            }
        }, MoreExecutors.directExecutor());

        service.startAsync().awaitRunning();
        System.out.println("服務狀態為:" + service.state());

        Thread.sleep(10 * 1000);

        service.stopAsync().awaitTerminated();

        System.out.println("服務狀態為:" + service.state());
    }

}



triggerShutdown() 方法會在執行方法stopAsync調用,startUp方法會在執行startAsync方法時調用,這個類的實現都是委托給AbstractService這個方法實現的。。具體代碼可以自己看一下

AbstractScheduledService

AbstractScheduledService類用於在運行時處理一些周期性的任務。子類可以實現 runOneIteration()方法定義一個周期執行的任務,以及相應的startUp()和shutDown()方法。為了能夠描述執行周期,你需要實現scheduler()方法。通常情況下,你可以使用AbstractScheduledService.Scheduler類提供的兩種調度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),類似於JDK並發包中ScheduledExecutorService類提供的兩種調度方式。如要自定義schedules則可以使用 CustomScheduler類來輔助實現。一個實現類看起來應該是這樣的

import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.TimeUnit;

/**
 * User: hupeng
 * Date: 14-12-22
 * Time: 下午7:43
 */
public class AbstractScheduledServiceTest extends AbstractScheduledService {


    @Override
    protected void startUp() throws Exception {

    }

    @Override
    protected void shutDown() throws Exception {

    }

    @Override
    protected void runOneIteration() throws Exception {
        // //處理異常,這里如果拋出異常,會使服務狀態變為failed同時導致任務終止
        try {
            System.out.println("do work....");
        } catch (Exception e) {
            //處理異常
        }
    }

    @Override
    protected Scheduler scheduler() {
        return Scheduler.newFixedDelaySchedule(1, 5, TimeUnit.SECONDS);
    }


    public static void main(String[] args) throws InterruptedException {
        AbstractScheduledServiceTest service = new AbstractScheduledServiceTest();

        service.addListener(new Listener() {
            @Override
            public void starting() {
                System.out.println("服務開始啟動.....");
            }

            @Override
            public void running() {
                System.out.println("服務開始運行");
                ;
            }

            @Override
            public void stopping(State from) {
                System.out.println("服務關閉中");
            }

            @Override
            public void terminated(State from) {
                System.out.println("服務終止");
            }

            @Override
            public void failed(State from, Throwable failure) {
                System.out.println("失敗,cause:" + failure.getCause());
            }
        }, MoreExecutors.directExecutor());

        service.startAsync().awaitRunning();
        System.out.println("服務狀態為:" + service.state());

        Thread.sleep(10 * 1000);

        service.stopAsync().awaitTerminated();

        System.out.println("服務狀態為:" + service.state());
    }
}

當然這個Listener的注冊只是為了測試觀察。AbstractScheduledServic默認使用Executors.newSingleThreadScheduledExecutor來執行的

/**
   * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
   * {@link #runOneIteration} and {@link #shutDown} methods.  If this method is overridden the 
   * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this 
   * service {@linkplain Service.State#TERMINATED terminates} or 
   * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a 
   * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called 
   * once.
   * 
   * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
   * pool that sets the name of the thread to the {@linkplain #serviceName() service name}.  
   * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the 
   * service {@linkplain Service.State#TERMINATED terminates} or 
   * {@linkplain Service.State#TERMINATED fails}.
   */
  protected ScheduledExecutorService executor() {
    final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactory() {
          @Override public Thread newThread(Runnable runnable) {
            return MoreExecutors.newThread(serviceName(), runnable);
          }
        });
    // Add a listener to shutdown the executor after the service is stopped.  This ensures that the
    // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
    // Technically this listener is added after start() was called so it is a little gross, but it
    // is called within doStart() so we know that the service cannot terminate or fail concurrently
    // with adding this listener so it is impossible to miss an event that we are interested in.
    addListener(new Listener() {
      @Override public void terminated(State from) {
        executor.shutdown();
      }
      @Override public void failed(State from, Throwable failure) {
        executor.shutdown();
      }
    }, directExecutor());
    return executor;
  }

你可以參照這個實現override這個方法,得到你想要的ScheduledExecutorService。

AbstractService

如需要自定義的線程管理、可以通過擴展 AbstractService類來實現。一般情況下、使用上面的幾個實現類就已經滿足需求了,但如果在服務執行過程中有一些特定的線程處理需求、則建議繼承AbstractService類。

繼承AbstractService方法必須實現兩個方法.

doStart(): 首次調用startAsync()時會同時調用doStart(),doStart()內部需要處理所有的初始化工作、如果啟動成功則調用notifyStarted()方法;啟動失敗則調用notifyFailed()
doStop(): 首次調用stopAsync()會同時調用doStop(),doStop()要做的事情就是停止服務,如果停止成功則調用 notifyStopped()方法;停止失敗則調用 notifyFailed()方法。
doStart和doStop方法的實現需要考慮下性能,盡可能的低延遲。如果初始化的開銷較大,如讀文件,打開網絡連接,或者其他任何可能引起阻塞的操作,建議移到另外一個單獨的線程去處理。

使用ServiceManager

除了對Service接口提供基礎的實現類,Guava還提供了 ServiceManager類使得涉及到多個Service集合的操作更加容易。通過實例化ServiceManager類來創建一個Service集合,你可以通過以下方法來管理它們:

startAsync() : 將啟動所有被管理的服務。如果當前服務的狀態都是NEW的話、那么你只能調用該方法一次、這跟 Service#startAsync()是一樣的。
stopAsync() :將停止所有被管理的服務。
addListener :會添加一個ServiceManager.Listener,在服務狀態轉換中會調用該Listener
awaitHealthy() :會等待所有的服務達到Running狀態
awaitStopped():會等待所有服務達到終止狀態
檢測類的方法有:

isHealthy() :如果所有的服務處於Running狀態、會返回True
servicesByState():以狀態為索引返回當前所有服務的快照
startupTimes() :返回一個Map對象,記錄被管理的服務啟動的耗時、以毫秒為單位,同時Map默認按啟動時間排序。
我們建議整個服務的生命周期都能通過ServiceManager來管理,不過即使狀態轉換是通過其他機制觸發的、也不影響ServiceManager方法的正確執行。例如:當一個服務不是通過startAsync()、而是其他機制啟動時,listeners 仍然可以被正常調用、awaitHealthy()也能夠正常工作。ServiceManager 唯一強制的要求是當其被創建時所有的服務必須處於New狀態。

參考:http://ifeve.com/google-guava-serviceexplained/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM