
實現Futrue接口
public class MsgFuture<V> implements java.util.concurrent.Future<V> {
...
...
}
Future的主要特性為Future.get()、
get()
get(long timeout, TimeUnit unit)
主要思路如下:
構造MsgFuture時,設置開始時間,這里是sendTime;設置timeout,默認get()方法的超時時間,我們的程序不可能會無限等待
默認的get()對應的值域是result,默認為一個NULL對象,標識沒有返回數據
result的值需要其他線程在做完任務后將值寫到Future對象中,這里暴露了一個方法setResult(object)
/**
* 設置結果值result,喚醒condition {@link #get(long, TimeUnit)}
* @param result
*/
public synchronized void setResult(Object result) {
reentrantLock.lock();
try {
this.result = result;
condition.signalAll();
}finally {
reentrantLock.unlock();
}
}
使用ReentrantLock來進行數據可見性控制
condition.signalAll()可以喚醒condition.await的阻塞wait
至於其他線程如何調用到setResult(object)方法,可以使用ConcurrentHashMap,key為msgId,值為MsgFuture對象,設置成一個全局的,或兩個線程都可訪問,其他線程根據msgId獲取到MsgFuture,然后調用setResult(object)方法
/**
* 獲取結果,如果到達timeout還未得到結果,則會拋出TimeoutException
* @param timeout
* @param unit
* @return
* @throws InterruptedException
* @throws TimeoutException
*/
@SuppressWarnings("all")
public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
long left = getLeftTime(timeout, unit); //根據timeout配置獲取剩余的世界
if(left < 0){
//已經沒有剩余時間
if(isDone()){ //如果已經完成,直接放回結果
return (V)this.result;
}else{
//timeout
throw new TimeoutException("返回超時,后續的響應將會被丟棄abort");
}
}else{
reentrantLock.lock(); //同步
try {
//獲取鎖后先判斷是否已經完成,防止無意義的await
if(isDone()){ //先判斷是否已經完成
return (V)this.result; //直接返回
}
logger.debug("await "+left+" ms");
condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS); //沒有返回,阻塞等待,如果condition被喚醒,也會提前退出
}finally {
reentrantLock.unlock();
}
if(isDone()){ //被喚醒或超時時間已到,嘗試判斷是否完成
return (V)this.result; //返回
}
throw new TimeoutException("未獲取到結果"); //超時
}
}
public boolean isDone() {
return this.result != NULL;
}
全部代碼
public class MsgFuture<V> implements java.util.concurrent.Future<V> {
private final static Logger logger = LoggerFactory.getLogger(MsgFuture.class);
/**
* 全局的空對象,如果Future獲取到值了,那么一定不是NULL
*/
private final static Object NULL = new Object();
/**
* 主鎖
*/
private final ReentrantLock reentrantLock = new ReentrantLock();
/**
* 條件,利用它的condition.await(left, TimeUnit.MILLISECONDS)和notifyAll方法來實現阻塞、喚醒
*/
private final Condition condition = reentrantLock.newCondition();
private int timeout;
private volatile Object result = NULL;
private long sendTime;
public MsgFuture(int timeout, long sendTime) {
this.timeout = timeout;
this.sendTime = sendTime;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
public boolean isCancelled() {
return false;
}
public boolean isDone() {
return this.result != NULL;
}
/**
* 獲取future結果
* @return
* @throws InterruptedException
*/
public V get() throws InterruptedException {
logger.debug("sendTime:{}",sendTime);
try {
return get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
logger.error("獲取future結果異常", e);
}
return null;
}
/**
* 獲取結果,如果到達timeout還未得到結果,則會拋出TimeoutException
* @param timeout
* @param unit
* @return
* @throws InterruptedException
* @throws TimeoutException
*/
@SuppressWarnings("all")
public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
long left = getLeftTime(timeout, unit);
if(left < 0){
//已經沒有剩余時間
if(isDone()){
return (V)this.result;
}else{
//timeout
throw new TimeoutException("返回超時,后續的響應將會被丟棄abort");
}
}else{
reentrantLock.lock();
try {
//獲取鎖后先判斷是否已經完成,防止無意義的await
if(isDone()){
return (V)this.result;
}
logger.debug("await "+left+" ms");
condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);
}finally {
reentrantLock.unlock();
}
if(isDone()){
return (V)this.result;
}
throw new TimeoutException("未獲取到結果");
}
}
/**
* 設置結果值result,喚醒condition {@link #get(long, TimeUnit)}
* @param result
*/
public synchronized void setResult(Object result) {
reentrantLock.lock();
try {
this.result = result;
condition.signalAll();
}finally {
reentrantLock.unlock();
}
}
/**
* 計算剩余時間
* @param timeout
* @param unit
* @return
*/
private long getLeftTime(long timeout, TimeUnit unit){
long now = System.currentTimeMillis();
timeout = unit.toMillis(timeout); // 轉為毫秒
return timeout - (now - sendTime);
}
/*public static void main(String[] args) {
MsgFuture msgFuture = new MsgFuture(2000,System.currentTimeMillis());
//測試先喚醒、后get是否正常
msgFuture.setResult("yoxi");
try {
System.out.println(msgFuture.get(2000,TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
logger.error("Interrupt異常", e);
} catch (TimeoutException e) {
logger.error("測試先喚醒,后get出錯", e);
}
}*/
}
