最近在學習了下Google的Guava包,發現這真是一個好東西啊。。由於平時也會寫一些基於多線程的東西,所以特意了解了下這個Service框架。這里Guava包里的Service接口用於封裝一個服務對象的運行狀態、包括start和stop等方法。例如web服務器,RPC服務器、計時器等可以實現這個接口。對此類服務的狀態管理並不輕松、需要對服務的開啟/關閉進行妥善管理、特別是在多線程環境下尤為復雜。Guava包提供了一些基礎類幫助你管理復雜的狀態轉換邏輯和同步細節。
概述
一個服務正常生命周期有:
- Service.State.NEW
- Service.State.STARTING
- Service.State.RUNNING
- Service.State.STOPPING
- Service.State.TERMINATED
服務一旦被停止就無法再重新啟動了。如果服務在starting、running、stopping狀態出現問題、會進入Service.State.FAILED.狀態。調用 startAsync()方法可以異步開啟一個服務,同時返回this對象形成方法調用鏈。注意:只有在當前服務的狀態是NEW時才能調用startAsync()方法,因此最好在應用中有一個統一的地方初始化相關服務。停止一個服務也是類似的、使用異步方法stopAsync() 。但是不像startAsync(),多次調用這個方法是安全的。這是為了方便處理關閉服務時候的鎖競爭問題。
Service也提供了一些方法用於等待服務狀態轉換的完成:通過 addListener()方法異步添加監聽器。此方法允許你添加一個 Service.Listener 、它會在每次服務狀態轉換的時候被調用。注意:最好在服務啟動之前添加Listener(這時的狀態是NEW)、否則之前已發生的狀態轉換事件是無法在新添加的Listener上被重新觸發的。
同步使用awaitRunning()。這個方法不能被打斷、不強制捕獲異常、一旦服務啟動就會返回。如果服務沒有成功啟動,會拋出IllegalStateException異常。同樣的, awaitTerminated() 方法會等待服務達到終止狀態(TERMINATED 或者 FAILED)。兩個方法都有重載方法允許傳入超時時間。
Service 接口本身實現起來會比較復雜、且容易碰到一些捉摸不透的問題。因此我們不推薦直接實現這個接口。而是請繼承Guava包里已經封裝好的基礎抽象類。每個基礎類支持一種特定的線程模型。
Service接口的實現
AbstractIdleService
AbstractIdleService在我們服務處於running狀態時,不會做執行任何動作,我們僅僅只有在startup和shutdown的時候才執行一些動作,所以我們在實現這個方法時,只是簡單的實現startUp() 和 shutDown() 這兩個方法即可,在startUp方法中做一些比如初始化,注冊等操作,在shutDown中做一些清理操作等。舉個例子,也就是官網的例子:
protected void startUp() {
servlets.add(new GcStatsServlet());
}
protected void shutDown() {}
我們在startUp()方法的時候,實例化了一個GcStatsServlet,當我們在運行的時候,會有現成的線程處理這個Servlet,所以在服務運行時就不需要做什么額外動作了。這個比較簡單,就不舉例子了,應該用的情況應該不會很多吧?。。。。
AbstractExecutionThreadService
AbstractExecutionThreadService在單個線程中執行startup, running, and shutdown,我們必須實現run()方法,同事在方法中要能響應停止服務的請求,比如在一個循環中:
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.TimeUnit;
/**
* User: hupeng
* Date: 14-12-22
* Time: 下午10:17
*/
public class AbstractExecutionThreadServiceTest extends AbstractExecutionThreadService {
private volatile boolean running = true; //聲明一個狀態
@Override
protected void startUp() {
////在這里我們可以做一些初始化操作
}
@Override
public void run() {
while (running) {
// do our work
try {
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
System.out.println("do our work.....");
} catch (Exception e) {
//處理異常,這里如果拋出異常,會使服務狀態變為failed同時導致任務終止。
}
}
}
@Override
protected void triggerShutdown() {
running = false; //這里我們改變狀態值,run方法中就能夠得到響應。=
//可以做一些清理操作,也可以移到shutDown()方法中執行
}
@Override
protected void shutDown() throws Exception {
//可以關閉資源,關閉連接。。。
}
public static void main(String[] args) throws InterruptedException {
AbstractExecutionThreadServiceTest service = new AbstractExecutionThreadServiceTest();
service.addListener(new Listener() {
@Override
public void starting() {
System.out.println("服務開始啟動.....");
}
@Override
public void running() {
System.out.println("服務開始運行");;
}
@Override
public void stopping(State from) {
System.out.println("服務關閉中");
}
@Override
public void terminated(State from) {
System.out.println("服務終止");
}
@Override
public void failed(State from, Throwable failure) {
System.out.println("失敗,cause:" + failure.getCause());
}
}, MoreExecutors.directExecutor());
service.startAsync().awaitRunning();
System.out.println("服務狀態為:" + service.state());
Thread.sleep(10 * 1000);
service.stopAsync().awaitTerminated();
System.out.println("服務狀態為:" + service.state());
}
}
triggerShutdown() 方法會在執行方法stopAsync調用,startUp方法會在執行startAsync方法時調用,這個類的實現都是委托給AbstractService這個方法實現的。。具體代碼可以自己看一下
AbstractScheduledService
AbstractScheduledService類用於在運行時處理一些周期性的任務。子類可以實現 runOneIteration()方法定義一個周期執行的任務,以及相應的startUp()和shutDown()方法。為了能夠描述執行周期,你需要實現scheduler()方法。通常情況下,你可以使用AbstractScheduledService.Scheduler類提供的兩種調度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),類似於JDK並發包中ScheduledExecutorService類提供的兩種調度方式。如要自定義schedules則可以使用 CustomScheduler類來輔助實現。一個實現類看起來應該是這樣的
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.TimeUnit;
/**
* User: hupeng
* Date: 14-12-22
* Time: 下午7:43
*/
public class AbstractScheduledServiceTest extends AbstractScheduledService {
@Override
protected void startUp() throws Exception {
}
@Override
protected void shutDown() throws Exception {
}
@Override
protected void runOneIteration() throws Exception {
// //處理異常,這里如果拋出異常,會使服務狀態變為failed同時導致任務終止
try {
System.out.println("do work....");
} catch (Exception e) {
//處理異常
}
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(1, 5, TimeUnit.SECONDS);
}
public static void main(String[] args) throws InterruptedException {
AbstractScheduledServiceTest service = new AbstractScheduledServiceTest();
service.addListener(new Listener() {
@Override
public void starting() {
System.out.println("服務開始啟動.....");
}
@Override
public void running() {
System.out.println("服務開始運行");
;
}
@Override
public void stopping(State from) {
System.out.println("服務關閉中");
}
@Override
public void terminated(State from) {
System.out.println("服務終止");
}
@Override
public void failed(State from, Throwable failure) {
System.out.println("失敗,cause:" + failure.getCause());
}
}, MoreExecutors.directExecutor());
service.startAsync().awaitRunning();
System.out.println("服務狀態為:" + service.state());
Thread.sleep(10 * 1000);
service.stopAsync().awaitTerminated();
System.out.println("服務狀態為:" + service.state());
}
}
當然這個Listener的注冊只是為了測試觀察。AbstractScheduledServic默認使用Executors.newSingleThreadScheduledExecutor來執行的
/**
* Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
* {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the
* executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this
* service {@linkplain Service.State#TERMINATED terminates} or
* {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a
* custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called
* once.
*
* <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
* pool that sets the name of the thread to the {@linkplain #serviceName() service name}.
* Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the
* service {@linkplain Service.State#TERMINATED terminates} or
* {@linkplain Service.State#TERMINATED fails}.
*/
protected ScheduledExecutorService executor() {
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override public Thread newThread(Runnable runnable) {
return MoreExecutors.newThread(serviceName(), runnable);
}
});
// Add a listener to shutdown the executor after the service is stopped. This ensures that the
// JVM shutdown will not be prevented from exiting after this service has stopped or failed.
// Technically this listener is added after start() was called so it is a little gross, but it
// is called within doStart() so we know that the service cannot terminate or fail concurrently
// with adding this listener so it is impossible to miss an event that we are interested in.
addListener(new Listener() {
@Override public void terminated(State from) {
executor.shutdown();
}
@Override public void failed(State from, Throwable failure) {
executor.shutdown();
}
}, directExecutor());
return executor;
}
你可以參照這個實現override這個方法,得到你想要的ScheduledExecutorService。
AbstractService
如需要自定義的線程管理、可以通過擴展 AbstractService類來實現。一般情況下、使用上面的幾個實現類就已經滿足需求了,但如果在服務執行過程中有一些特定的線程處理需求、則建議繼承AbstractService類。
繼承AbstractService方法必須實現兩個方法.
doStart(): 首次調用startAsync()時會同時調用doStart(),doStart()內部需要處理所有的初始化工作、如果啟動成功則調用notifyStarted()方法;啟動失敗則調用notifyFailed()
doStop(): 首次調用stopAsync()會同時調用doStop(),doStop()要做的事情就是停止服務,如果停止成功則調用 notifyStopped()方法;停止失敗則調用 notifyFailed()方法。
doStart和doStop方法的實現需要考慮下性能,盡可能的低延遲。如果初始化的開銷較大,如讀文件,打開網絡連接,或者其他任何可能引起阻塞的操作,建議移到另外一個單獨的線程去處理。
使用ServiceManager
除了對Service接口提供基礎的實現類,Guava還提供了 ServiceManager類使得涉及到多個Service集合的操作更加容易。通過實例化ServiceManager類來創建一個Service集合,你可以通過以下方法來管理它們:
startAsync() : 將啟動所有被管理的服務。如果當前服務的狀態都是NEW的話、那么你只能調用該方法一次、這跟 Service#startAsync()是一樣的。
stopAsync() :將停止所有被管理的服務。
addListener :會添加一個ServiceManager.Listener,在服務狀態轉換中會調用該Listener
awaitHealthy() :會等待所有的服務達到Running狀態
awaitStopped():會等待所有服務達到終止狀態
檢測類的方法有:
isHealthy() :如果所有的服務處於Running狀態、會返回True
servicesByState():以狀態為索引返回當前所有服務的快照
startupTimes() :返回一個Map對象,記錄被管理的服務啟動的耗時、以毫秒為單位,同時Map默認按啟動時間排序。
我們建議整個服務的生命周期都能通過ServiceManager來管理,不過即使狀態轉換是通過其他機制觸發的、也不影響ServiceManager方法的正確執行。例如:當一個服務不是通過startAsync()、而是其他機制啟動時,listeners 仍然可以被正常調用、awaitHealthy()也能夠正常工作。ServiceManager 唯一強制的要求是當其被創建時所有的服務必須處於New狀態。