相信你在日常的開發中肯定遇到過這種問題: 需要對實體類的狀態信息進行管理,比如一定時間后修改它為XXX狀態.
舉個例子: 訂單服務,當用戶提交了訂單后,如果在30分鍾內沒有支付,自動取消訂單,這就是一個對狀態的管理;
再舉一個我實際開發的例子: 消息管道的例子,用戶來拉取消息后,如果在30s內沒有提交,那么修改他的訂閱狀態為:未訂閱,這樣其他的實例可以建立連接繼續讀取.
整理設計圖:

核心就是: 一個Thread + 一個Queue;Thread不斷從隊列中取出數據, 如果隊列中為空或者里邊的任務沒到期,則線程卡住wait(timeOut).
二 詳細設計
先是簡單的有狀態的實體類:ConsumerInfoState,這個類的核心是狀態(訂閱到期時間),所以得有對狀態的查詢設置,查詢距到期還要多久等等....
import java.io.Serializable;
public class ConsumerInfoState implements Serializable {
/**
* 序列化ID
*/
private static final long serialVersionUID = 1L;
/**
* 過期時間20s
*/
protected long expiration;
private String topic;
private String userId;
private boolean isSubscribed = false;
private long CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT = 5000;
public ConsumerInfoState(String userId) {
this.userId = userId;
this.expiration = System.currentTimeMillis() + CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT;
}
public ConsumerInfoState(String topic, String userId) {
super();
this.topic = topic;
this.userId = userId;
this.expiration = System.currentTimeMillis() + CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT;
}
/**
*是否過期
*/
public boolean expired(long nowMs) {
return expiration <= nowMs;
}
/**
* <p>
* 更新訂閱過期時間
* </p>
*/
public void updateExpiration() {
this.expiration = System.currentTimeMillis() + CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT;
}
/**
* <p>
* 到指定時間還有多久
* </p>
*/
public long untilExpiration(long nowMs) {
return this.expiration - nowMs;
}
public String getUserId() {
return userId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setSubscribed(boolean isSubscribed) {
this.isSubscribed = isSubscribed;
}
public boolean hasSubscribed() {
return isSubscribed;
}
}
這個類還是很清晰的..
核心類: ConsumerInfoManager
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerInfoManager {
Logger logger = LoggerFactory.getLogger(ConsumerInfoManager.class);
//任務隊列
private final PriorityQueue<ConsumerInfoState> consumersByExpiration = new PriorityQueue<ConsumerInfoState>(
new Comparator<ConsumerInfoState>() {
//小的在前
public int compare(ConsumerInfoState o1, ConsumerInfoState o2) {
if (o1.expiration < o2.expiration) {
return -1;
} else if (o1.expiration == o2.expiration) {
return 0;
} else {
return 1;
}
}
});
private ExpirationThread expirationThread;
public ConsumerInfoManager() {
//啟動線程
this.expirationThread = new ExpirationThread();
this.expirationThread.start();
}
//加入任務隊列
public synchronized void addConsumerInfoSate(ConsumerInfoState consumerInfoSate) {
consumersByExpiration.add(consumerInfoSate);
this.notifyAll();
}
@SuppressWarnings("unused")
public synchronized void updateExpiration(ConsumerInfoState state) {
// 先刪除在放里邊,重新排序
consumersByExpiration.remove(state);
state.updateExpiration();
consumersByExpiration.add(state);
this.notifyAll();
}
public void shutdown() {
logger.debug("Shutting down consumers");
expirationThread.shutdown();
synchronized (this) {
consumersByExpiration.clear();
}
}
/**
* <p>
* 檢查consumerInfo的過期時間,過期就從緩存中刪除
* </p>
* @author jiangyuechao 2018年1月13日 下午2:04:30
*/
@SuppressWarnings("unused")
private class ExpirationThread extends Thread {
AtomicBoolean isRunning = new AtomicBoolean(true);
CountDownLatch shutdownLatch = new CountDownLatch(1);
public ExpirationThread() {
super("Consumer Expiration Thread");
setDaemon(true);
}
@Override
public void run() {
synchronized (ConsumerInfoManager.this) {
try {
while (isRunning.get()) {
long now = System.currentTimeMillis();
//隊列空和最近一個任務是否到期的判斷
while (!consumersByExpiration.isEmpty() && consumersByExpiration.peek().expired(now)) {
final ConsumerInfoState state = consumersByExpiration.remove();
//{你自己的業務處理}
state.setSubscribed(false);
logger.info("任務已到期,topic:{}, userID:{},subscribed:{}",state.getTopic(),state.getUserId(),state.hasSubscribed());
}
//需要等待的時間
long timeout = consumersByExpiration.isEmpty() ? Long.MAX_VALUE
: consumersByExpiration.peek().untilExpiration(now);
ConsumerInfoManager.this.wait(timeout);
}
} catch (InterruptedException e) {
// Interrupted by other thread, do nothing to allow this thread to exit
logger.error("ExpirationThread線程中斷", e);
}
}
shutdownLatch.countDown();
}
public void shutdown() {
try {
isRunning.set(false);
this.interrupt();
shutdownLatch.await();
} catch (InterruptedException e) {
throw new Error("Interrupted when shutting down consumer worker thread.");
}
}
}
public void join(){
try {
expirationThread.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
代碼就這些,我進行了刪減,刪除了不重要的部分, 一般ConsumerInfoManager還需要一個緩存Cache,Cache中是存儲所有的實體類,queue中是Cache中的一部分,一般queue中的任務到期,需要從Cache中刪除或取出執行一些操作.
當然加Cache是復雜點的,核心思想就這些,額外的代碼就刪除了..
最后測試一下
public class ManagerTest {
static ConsumerInfoManager consumerInfoManager;
static String userId = "dhsajkdsajkdsjh1";
static Logger logger = LoggerFactory.getLogger(ManagerTest.class);
public static void main(String[] args) throws InterruptedException {
//實例化
setUp();
for(int i = 0;i<3;i++){
ConsumerInfoState consumerInfoState = new ConsumerInfoState("chao-"+i, userId);
consumerInfoState.setSubscribed(true);
consumerInfoManager.addConsumerInfoSate(consumerInfoState);
logger.info("任務"+i+"加入隊列");
Thread.sleep(1000);
}
consumerInfoManager.join();
}
public static void setUp(){
consumerInfoManager = new ConsumerInfoManager();
}
}
輸出結果: 符合預期...
2018-01-17 10:07:27,450 [main] INFO ManagerTest - 任務0加入隊列
2018-01-17 10:07:28,451 [main] INFO ManagerTest - 任務1加入隊列
2018-01-17 10:07:29,451 [main] INFO ManagerTest - 任務2加入隊列
2018-01-17 10:07:32,451 [Consumer Expiration Thread] INFO ConsumerInfoManager - 任務已到期,topic:chao-0, userID:dhsajkdsajkdsjh1,subscribed:false
2018-01-17 10:07:33,485 [Consumer Expiration Thread] INFO ConsumerInfoManager - 任務已到期,topic:chao-1, userID:dhsajkdsajkdsjh1,subscribed:false
2018-01-17 10:07:34,452 [Consumer Expiration Thread] INFO ConsumerInfoManager - 任務已到期,topic:chao-2, userID:dhsajkdsajkdsjh1,subscribed:false
