Netty中的Future


先看下Future的整個繼承體系,還有一個ChannelFuture不在里面;

    在並發編程中,我們通常會用到一組非阻塞的模型:Promise,Future 和 Callback。其中的 Future 表示一個可能還沒有實際完成的異步任務的結果,針對這個結果可以添加 Callback 以便在任務執行成功或失敗后做出對應的操作,而 Promise 交由任務執行者,任務執行者通過 Promise 可以標記任務完成或者失敗。 可以說這一套模型是很多異步非阻塞架構的基礎。
    這一套經典的模型在 Scala、C# 中得到了原生的支持,但 JDK 中暫時還只有無 Callback 的 Future 出現,當然也並非在 JAVA 界就沒有發展了,比如 Guava 就提供了ListenableFuture 接口,而 Netty 4+ 更是提供了完整的 Promise、Future 和 Listener 機制,在 Netty 的官方文檔 Using as a generic library 中也介紹了將 Netty 作為一個 lib 包依賴,並且使用 Listenable futures 的示例。在實際的項目使用中,發現 Netty 的 EventLoop 機制不一定適用其他場景,因此想去除對 EventLoop 的依賴,實現一個簡化版本。
    Netty自己實現了一套並發庫,Future是其中的一塊,下篇文章講下他的並發庫的線程池實現。Netty的Future的特性
  • Future<V>的V為異步結果的返回類型
  • getNow 是無阻塞調用,返回異步執行結果,如果未完成那么返回null
  • await 是阻塞調用,等到異步執行完成
  • isSuccess 執行成功是否成功
  • sync  阻塞調用,等待這個future直到isDone(可能由於正常終止、異常或取消而完成)返回true; 如果該future失敗,重新拋出失敗的原因。 和await區別就是返回結果不同,它返回一個Future對象,通過這個Future知道任務執行結果。
  • 添加GenericFutureListener, 執行完成(future可能由於正常終止、異常或取消而完成)后調用該監聽器。
如果我實現這個Future怎么實現:1:任務執行器,這樣我們可以控制任務的執行了。2:執行結果,用於返回。3:監聽器集合。4:執行結果狀態
5:觸發監聽器的函數。那么 怎么執行完成后自定觸發監聽器,應該在Future里面又啟動了一個線程去執行這個觸發機制。這些都是我的猜想。看下他的子類怎么實現的:
 
AbstractFuture實現了JDK中的get方法,調用netty中future的方法,一個模板模式出現了。所有的Future都會繼承該類
Promise:任務執行者可以標記任務執行成功或失敗,添加了setFailure(Throwable)可以知道Promise的子類需要有個成員變量來保存異常,添加了setSuccess(V) 方法,setUncancellable()方法, 和tryFailure和trySuccess方法。這樣所有的操作都可以返回一個Promise,你自己檢測是否執行成功,好處是啥,接口統一嗎?
ScheduledFuture:一個定時任務的執行結果
ProgressiveFuture:可以跟蹤任務的執行進度。
 
下面看下幾個類的具體實現:
DefaultPromise:成員變量如下:
  1. privatefinalEventExecutor executor; //任務執行器
  2. privatevolatileObject result;//不僅僅是結果,也有可能是異常
  3. * 一個或多個監聽器,可能是GenericFutureListener或者DefaultFutureListeners。如果是NULL有兩種可能
    * 1:沒有添加觸發器
    * 2:已經出發了
  4. privateObject listeners;
  5. privateLateListeners lateListeners;
  6. privateshort waiters;
看來異常也是結果。
看一個重要方法的實現:
isDone: 可能由於正常終止、異常或取消而完成
  1. privatestaticboolean isDone0(Object result){
  2. return result !=null&& result != UNCANCELLABLE;
  3. }
isSuccess: 任務執行成功
  1. publicboolean isSuccess(){
  2. Object result =this.result;
  3. if(result ==null|| result == UNCANCELLABLE){
  4. returnfalse;
  5. }
  6. return!(result instanceofCauseHolder);
  7. }
getNow:返回執行結果
  1. public V getNow(){
  2. Object result =this.result;
  3. if(result instanceofCauseHolder|| result == SUCCESS){
  4. returnnull;
  5. }
  6. return(V) result;
  7. }
sync:同步阻塞方法
  1. @Override
  2. publicPromise<V> sync()throwsInterruptedException{
  3. await();
  4. rethrowIfFailed();
  5. returnthis;
  6. }
看來比await多了拋出異常。
await:等待任務執行完成
  1. @Override
  2. publicPromise<V> await()throwsInterruptedException{
  3. if(isDone()){
  4. returnthis;
  5. }
  6. if(Thread.interrupted()){
  7. thrownewInterruptedException(toString());
  8. }
  9. synchronized(this){
  10. while(!isDone()){
  11. checkDeadLock();//判斷當前線程是否是執行線程。如果是拋出異常。
  12. incWaiters();//添加等待個數
  13. try{
  14. wait();//釋放鎖,等待喚醒,阻塞該線程
  15. }finally{
  16. decWaiters();
  17. }
  18. }
  19. }
  20. returnthis;
  21. }
執行器所在的線程不能調用await(),只能是調用者所在的線程才可以,waiters有什么用呢?
cancle方法使用:
  1. @Override
  2. publicboolean cancel(boolean mayInterruptIfRunning){
  3. Object result =this.result;
  4. if(isDone0(result)|| result == UNCANCELLABLE){
  5. returnfalse;
  6. }
  7. synchronized(this){
  8. // Allow only once.
  9. result =this.result;
  10. if(isDone0(result)|| result == UNCANCELLABLE){
  11. returnfalse;
  12. }
  13. this.result = CANCELLATION_CAUSE_HOLDER;
  14. if(hasWaiters()){
  15. notifyAll();
  16. }
  17. }
  18. notifyListeners();
  19. returntrue;
  20. }
當我們修改DefaultPromise的狀態時,要觸發監聽器。
notifyListeners:
  1. /**
  2. * 該方法不需要異步,為啥呢
  3. * 1:這個方法在同步代碼塊里面調用,因此任何監聽器列表的改變都happens-before該方法
  4. * 2:該方法只有isDone==true的時候調用,一但 isDone==true 那么監聽器列表將不會改變
  5. */
  6. privatevoid notifyListeners(){
  7. Object listeners =this.listeners;
  8. if(listeners ==null){
  9. return;
  10. }
  11. EventExecutor executor = executor();
  12. if(executor.inEventLoop()){
  13. finalInternalThreadLocalMap threadLocals =InternalThreadLocalMap.get();
  14. finalint stackDepth = threadLocals.futureListenerStackDepth();
  15. if(stackDepth < MAX_LISTENER_STACK_DEPTH){
  16. threadLocals.setFutureListenerStackDepth(stackDepth +1);
  17. try{
  18. if(listeners instanceofDefaultFutureListeners){
  19. notifyListeners0(this,(DefaultFutureListeners) listeners);
  20. }else{
  21. finalGenericFutureListener<?extendsFuture<V>> l =
  22. (GenericFutureListener<?extendsFuture<V>>) listeners;
  23. notifyListener0(this, l);
  24. }
  25. }finally{
  26. this.listeners =null;
  27. threadLocals.setFutureListenerStackDepth(stackDepth);
  28. }
  29. return;
  30. }
  31. }
  32. if(listeners instanceofDefaultFutureListeners){
  33. finalDefaultFutureListeners dfl =(DefaultFutureListeners) listeners;
  34. execute(executor,newRunnable(){
  35. @Override
  36. publicvoid run(){
  37. notifyListeners0(DefaultPromise.this, dfl);
  38. DefaultPromise.this.listeners =null;
  39. }
  40. });
  41. }else{
  42. finalGenericFutureListener<?extendsFuture<V>> l =
  43. (GenericFutureListener<?extendsFuture<V>>) listeners;
  44. execute(executor,newRunnable(){
  45. @Override
  46. publicvoid run(){
  47. notifyListener0(DefaultPromise.this, l);
  48. DefaultPromise.this.listeners =null;
  49. }
  50. });
  51. }
  52. }
任務永遠不能在主線程中執行,需要放到執行器所在的線程執行。 DefaultFutureListeners和 GenericFutureListener,一個是容器,一個是元素
還有幾個修改狀態的方法:
  1. @Override
  2. publicboolean setUncancellable(){
  3. Object result =this.result;
  4. if(isDone0(result)){
  5. return!isCancelled0(result);
  6. }
  7. synchronized(this){
  8. // Allow only once.
  9. result =this.result;
  10. if(isDone0(result)){
  11. return!isCancelled0(result);
  12. }
  13. this.result = UNCANCELLABLE;
  14. }
  15. returntrue;
  16. }
  17. privateboolean setFailure0(Throwable cause){
  18. if(cause ==null){
  19. thrownewNullPointerException("cause");
  20. }
  21. if(isDone()){
  22. returnfalse;
  23. }
  24. synchronized(this){
  25. // Allow only once.
  26. if(isDone()){
  27. returnfalse;
  28. }
  29. result =newCauseHolder(cause);
  30. if(hasWaiters()){
  31. notifyAll();
  32. }
  33. }
  34. returntrue;
  35. }
  36. privateboolean setSuccess0(V result){
  37. if(isDone()){
  38. returnfalse;
  39. }
  40. synchronized(this){
  41. // Allow only once.
  42. if(isDone()){
  43. returnfalse;
  44. }
  45. if(result ==null){
  46. this.result = SUCCESS;
  47. }else{
  48. this.result = result;
  49. }
  50. if(hasWaiters()){
  51. notifyAll();
  52. }
  53. }
  54. returntrue;
  55. }
CompleteFuture的幾個子類是狀態Promise
 
PromiseTask:該類繼承了RunnableFuture接口,該類表示異步操作的結果也可以異步獲得,類似JDK中的FutureTask,實例化該對象時候需要傳一個Callable的對象,如果沒有該對象可以傳遞一個Runnable和一個Result構造一個Callable對象。
  1. privatestaticfinalclassRunnableAdapter<T>implementsCallable<T>{
  2. finalRunnable task;
  3. final T result;
  4. RunnableAdapter(Runnable task, T result){
  5. this.task = task;
  6. this.result = result;
  7. }
  8. @Override
  9. public T call(){
  10. task.run();
  11. return result;
  12. }
  13. @Override
  14. publicString toString(){
  15. return"Callable(task: "+ task +", result: "+ result +')';
  16. }
  17. }
看下他的run方法。
  1. @Override
  2. publicvoid run(){
  3. try{
  4. if(setUncancellableInternal()){
  5. V result = task.call();
  6. setSuccessInternal(result);
  7. }
  8. }catch(Throwable e){
  9. setFailureInternal(e);
  10. }
該類的setFailure,setSuccess等方法都會拋出一異常,而加了internal的方法會成功執行,他們是protected,子類或者同一個package中可以調用。
ScheduledFutureTask:該類是定時任務返回的 ChannelFuture是這個結構中最重要的類,從名稱可以知道是通道異步執行的結果:在netty中所有的IO操作都是異步的。這意味這所有的IO調用都會立即返回,且不保證IO操作完成。
IO調用會返回一個ChannelFuture的實例,通過該實例可以查看IO操作的結果和狀態,
ChannelFuture有完成和未完成兩種狀態,當IO操作開始,就會創建一個ChannelFuture的實例,該實例初始是未完成狀態,它不是成功,失敗,或者取消,因為IO操作還沒有完成,如果IO操作完成了那么將會有成功,失敗,和取消狀態,
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = <b>true</b> |
* +--------------------------+ | | isSuccess() = <b>true</b> |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = <b>false</b> | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = <b>true</b> |
* | isCancelled() = false | | | cause() = <b>non-null</b> |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = <b>true</b> |
* | isCancelled() = <b>true</b> |
* +---------------------------+
 該類提供了很多方法用來檢查IO操作是否完成,等待完成,和接受IO操作的結果。還可以添加ChannelFutureListener的監聽器,這樣IO操作完成時就可以得到提醒 * 強烈建議使用addListener而不是await。 * addListener是非阻塞的,它簡單的添加指定的ChannelFutureListener到ChannelFuture中, * IO線程將在當綁定在這個future的IO操作完成時,觸發這個觸發器,優點是提高效率和資源的利用率 * await()是一個阻塞方法,一旦調用,調用線程將會阻塞直到IO操作完成。優點是容易實現順序邏輯

 
 
 
 
 
 
 
 






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM