時間輪
參考: https://github.com/wolaiye1010/zdc-java-script/
參考: https://www.cnblogs.com/zhongwencool/p/timing_wheel.html
為什么要用時間輪實現
- 通常用於實現linux內核任務、游戲類的buf計時。
- 單個時間輪局限性:保存的任務數量少,不能超過當前時間輪。
- 多層時間輪,典型:日-時-分-秒
- 傳統java實現定時:Timer,只能單線程,會阻塞; Executors.newScheduledThreadPoll, 使用的最小堆來實現,任務還是不能太多,添加時間復雜度為O(logn)
時間輪簡單實現
1.實現日、時、分、秒、毫秒時間輪,設置Tick間隔,tick=100ms
2.創建對應的時間輪數組,ms=1000/tick,s=60,min=60,hour=24,day=365,數組中的每個元素類型為List
3.添加定時任務(不能添加到當前tick區間)
3.1 獲取當前時間輪時間 curTime, 獲取定時任務delayTime
3.2使用curTime+delayTime獲得任務下一次執行的事件,並判斷是否需要時間輪升級(ms->s->min->hour->day)
3.3將該定時任務添加到升級后時間輪的具體索引上
4.時間輪-one Tick增加
4.1 每一個tick間隔,將會進行一次one-tick增加,會觸發新的tick間隔內的任務,如果該任務period大於0,則繼續添加該定時任務,參考3
4.2 每一層的時間輪完成后都會進行,時間輪升級(例如59s,增加到下一輪,現獲取下一輪min的所有任務,然后再更新新的min對應s的時間輪),升級后新的時間輪任務就會在具體的降級時間輪中進行定位添加;
時間輪算法實現-參考https://github.com/wolaiye1010/zdc-java-script/ 加了些注釋,增加取消
public class TimeWheelService {
public static void main(String[] args) {
TimeWheelService timeWheelService = new TimeWheelService(3);
timeWheelService.schedule(()->{
for(int i=0;i<100;i++){
final int a=i;
timeWheelService.schedule(()-> System.out.println("^^^^^^buff-"+a),100,80);
}
},100,0);
timeWheelService.schedule(()->{
for(int i=0;i<100;i++){
final int a=i;
timeWheelService.schedule(()-> System.out.println("=====>debuff-"+a),100,100);
}
},100,0);
timeWheelService.schedule(()-> System.out.println(new Date()),10,1000);
}
private MultiTimeWheel timeWheel=new MultiTimeWheel();
private TimeWheelThread timeWheelThread=new TimeWheelThread();
//每輪的時間輪長度
private static final int TICK=10;
private static final int wheelIndexMillisecondLength=1000/TICK;
private static final int wheelIndexSecondLength=60;
private static final int wheelIndexMinuteLength=60;
private static final int wheelIndexHourLength=24;
private static final int wheelIndexDayLength=365;
//每一輪對應的所有ticks
private static final long wheelMillisecondAllTicks=1L;
//1s 10格
private static final long wheelSecondAllTicks=wheelMillisecondAllTicks*wheelIndexMillisecondLength;
//1min 600
private static final long wheelMinuteAllTicks=wheelSecondAllTicks*wheelIndexSecondLength;
//1h
private static final long wheelHourAllTicks=wheelMinuteAllTicks*wheelIndexMinuteLength;
//1day
private static final long wheelDayAllTicks=wheelHourAllTicks*wheelIndexHourLength;
//每一輪當前的索引,可以精確獲取時間
private AtomicInteger wheelIndexMillisecond=new AtomicInteger(0);
private AtomicInteger wheelIndexSecond=new AtomicInteger(0);
private AtomicInteger wheelIndexMinute=new AtomicInteger(0);
private AtomicInteger wheelIndexHour=new AtomicInteger(0);
private AtomicInteger wheelIndexDay=new AtomicInteger(0);
//實際存儲
private volatile Vector[] wheelMillisecond=new Vector[wheelIndexMillisecondLength];
private volatile Vector[] wheelSecond=new Vector[wheelIndexSecondLength];
private volatile Vector[] wheelMinute=new Vector[wheelIndexMinuteLength];
private volatile Vector[] wheelHour=new Vector[wheelIndexHourLength];
private volatile Vector[] wheelDay =new Vector[wheelIndexDayLength];
public void schedule(Runnable runnable,long delay,long period){
if(period<TICK && period>0) throw new RuntimeException("不能使得間隔周期小於時間片TICK:"+TICK+" ms,間隔周期可以為0ms");
synchronized(this){
TimeWheelTask timeWheelTask = new TimeWheelTask(delay, period, runnable);
schedule(timeWheelTask);
}
}
public void schedule(TimeWheelTask timeWheelTask){
//delay 加上當前相對於具體時間單位的余數。
//處理當前是0分59s時加入了1分1秒后任務,會導致在1分1秒時候執行,因此如果延遲本身大於當前的一輪周期,則用延遲加上當前時間與本輪毫秒值的余數
//00:00:59 + 61 = 00:02:00,可知,需要先加上本輪余數
timeWheelTask.delay=timeWheelTask.delay+timeWheel.getWheelNowTime(timeWheelTask.delay);
timeWheel.addTaskToWheel(timeWheelTask.delay,timeWheelTask);
}
//真正執行定時任務的線程池
private ThreadPoolExecutor threadPoolExecutor;
public TimeWheelService(int coreSize){
this.threadPoolExecutor=new ThreadPoolExecutor(coreSize,coreSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10000));
timeWheelThread.start();
}
//輪子
class MultiTimeWheel{
/**
* 增加 one-tick,可能會觸發每層輪,時間輪的升級
*/
public void incrTick() {
if(incIndex(TimeUnit.MILLISECONDS)){
return;
}
if(incIndex(TimeUnit.SECONDS)){
return;
}
if(incIndex(TimeUnit.MINUTES)){
return;
}
if(incIndex(TimeUnit.HOURS)){
return;
}
incIndex(TimeUnit.DAYS);
}
//增加一個tick,處理因為升級導致的新事件添加
private boolean incIndex(TimeUnit timeUnit){
long allTicksNext;
Vector[] vectorsNext;
AtomicInteger index;
AtomicInteger indexNext;
int wheelLength;
int wheelLengthNext;
switch(timeUnit){
case DAYS:
allTicksNext=0;
vectorsNext=null;
index=wheelIndexDay;
indexNext=null;
wheelLength=wheelIndexDayLength;
wheelLengthNext=0;
break;
case HOURS:
allTicksNext=wheelDayAllTicks;
vectorsNext=wheelDay;
index=wheelIndexHour;
indexNext=wheelIndexDay;
wheelLength=wheelIndexHourLength;
wheelLengthNext=wheelIndexDayLength;
break;
case MINUTES:
allTicksNext=wheelHourAllTicks;
vectorsNext=wheelHour;
index=wheelIndexMinute;
indexNext=wheelIndexHour;
wheelLength=wheelIndexMinuteLength;
wheelLengthNext=wheelIndexHourLength;
break;
case SECONDS:
allTicksNext=wheelMinuteAllTicks;
vectorsNext=wheelMinute;
index=wheelIndexSecond;
indexNext=wheelIndexMinute;
wheelLength=wheelIndexSecondLength;
wheelLengthNext=wheelIndexMinuteLength;
break;
case MILLISECONDS:
allTicksNext=wheelSecondAllTicks;
vectorsNext=wheelSecond;
index=wheelIndexMillisecond;
indexNext=wheelIndexSecond;
wheelLength=wheelIndexMillisecondLength;
wheelLengthNext=wheelIndexSecondLength;
break;
default:
throw new RuntimeException("Timeunit 參數錯誤");
}
index.getAndIncrement();
if(index.get()<wheelLength){
return true;
}
index.set(index.get()%wheelLength);
//如果是天數,因為當處理hours時候已經處理過天了,所以直接返回。
if(timeUnit.equals(TimeUnit.DAYS)){
return true;
}
//獲取下一個時間輪的任務,並添加
List<TimeWheelTask> taskList = vectorsNext[(indexNext.get() + 1) % wheelLengthNext];
if(null!=taskList){
for(TimeWheelTask task:taskList){
addTaskToWheel(task.delay%(allTicksNext),task);
}
taskList.clear();
}
return false;
}
public List<TimeWheelTask> getTaskList() {
return wheelMillisecond[wheelIndexMillisecond.get()];
}
//加入時間輪,判斷是否需要升級
void addTaskToWheel(long delay,TimeWheelTask task){
if(delay>=wheelIndexDayLength*wheelDayAllTicks*TICK){
throw new RuntimeException("不能超過一年");
}
if(addTaskToWheel(delay,task,TimeUnit.DAYS)){
return;
}
if(addTaskToWheel(delay,task,TimeUnit.HOURS)){
return;
}
if(addTaskToWheel(delay,task,TimeUnit.MINUTES)){
return;
}
if(addTaskToWheel(delay,task,TimeUnit.SECONDS)){
return;
}
addTaskToWheel(delay,task,TimeUnit.MILLISECONDS);
}
//添加任務到時間輪,
private boolean addTaskToWheel(long delay, TimeWheelTask timeWheelTask, TimeUnit timeUnit){
long allTicks;
Vector[] vectors;
AtomicInteger index;
int wheelLength;
switch (timeUnit){
case DAYS:
allTicks=wheelDayAllTicks;
vectors= wheelDay;
index=wheelIndexDay;
wheelLength=wheelIndexDayLength;
break;
case HOURS:
allTicks=wheelHourAllTicks;
vectors=wheelHour;
index=wheelIndexHour;
wheelLength=wheelIndexHourLength;
break;
case MINUTES:
allTicks=wheelMinuteAllTicks;
vectors=wheelMinute;
index=wheelIndexMinute;
wheelLength=wheelIndexMinuteLength;
break;
case SECONDS:
allTicks=wheelSecondAllTicks;
vectors=wheelSecond;
index=wheelIndexSecond;
wheelLength=wheelIndexSecondLength;
break;
case MILLISECONDS:
allTicks=wheelMillisecondAllTicks;
vectors=wheelMillisecond;
index=wheelIndexMillisecond;
wheelLength=wheelIndexMillisecondLength;
break;
default:
throw new RuntimeException("timeUnit 參數錯誤");
}
//添加到當前的索引
if(0!=delay/(allTicks*TICK) || timeUnit.equals(TimeUnit.MILLISECONDS)){
int indexNew=(index.get()+(int)(delay/(allTicks*TICK)))%wheelLength;
if(null==vectors[indexNew]){
vectors[indexNew]=new Vector();
}
vectors[indexNew].add(timeWheelTask);
return true;
}
return false;
}
//准確獲取當前需要添加的准確時間輪
long getWheelNowTime(long delay){
//當前時間 毫秒
long timeFromWheelStart=(wheelIndexDay.get()*wheelDayAllTicks+wheelIndexHour.get()*wheelHourAllTicks+wheelIndexMinute.get()*wheelMinuteAllTicks
+wheelIndexSecond.get()*wheelSecondAllTicks+wheelIndexMillisecond.get()*wheelMillisecondAllTicks)*TICK;
//從大到小開始處理,是否大於1天
if(0!=delay/(wheelDayAllTicks*TICK)){
return timeFromWheelStart%(wheelDayAllTicks*TICK);
}
//大於1小時
if(0!=delay/(wheelHourAllTicks*TICK)){
return timeFromWheelStart%(wheelHourAllTicks*TICK);
}
//大於1分鍾
if(0!=delay/(wheelMinuteAllTicks*TICK)){
return timeFromWheelStart%(wheelMinuteAllTicks*TICK);
}
//大於1秒
if(0!=delay/(wheelSecondAllTicks*TICK)){
return timeFromWheelStart%(wheelSecondAllTicks*TICK);
}
return 0;
}
}
/**
* 時間輪的task
*/
class TimeWheelTask implements Runnable{
private long delay;
//-1 為手動取消、0為1次定時、大於0表示正常調度間隔
private long period;
private Runnable runnable;
public void setDelay(long delay) {
this.delay = delay;
}
TimeWheelTask(long delay, long period, Runnable runnable){
this.delay=delay;
this.period=period;
this.runnable=runnable;
}
/**
* 判斷是否是周期性的調度任務
* @return
*/
public boolean isPeriodSchedule(){
return period>0;
}
@Override
public void run() {
if(this.period==-1){
return;
}
runnable.run();
}
/**
* 手動消除調度
*/
public void cancel(){
this.period=-1;
}
}
//時間輪 main線程
class TimeWheelThread extends Thread{
public TimeWheelThread(){
super("TimeWheel_main");
}
public TimeWheelThread(String thread_name){
super(thread_name);
}
@Override
public void run() {
try {
mainLoop();
}catch (Exception e){
System.out.println(e);
}finally {
System.out.println(111);
}
}
private void mainLoop() {
while (true){
//運行任務
runTask(timeWheel.getTaskList());
//時間增加 one-tick
timeWheel.incrTick();
//休眠
try {
TimeUnit.MILLISECONDS.sleep(TICK);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
private void runTask(List<TimeWheelTask> taskList) {
if(taskList==null || taskList.size()==0) return;
for(TimeWheelTask timeWheelTask:taskList){
threadPoolExecutor.execute(timeWheelTask);
if(timeWheelTask.isPeriodSchedule()){
timeWheelTask.setDelay(timeWheelTask.period);
schedule(timeWheelTask);
}
}
taskList.clear();
}
}
}
配合delayQueue實現空間換時間,對時間輪進行推進。參考:https://github.com/anurnomeru/Solution.git
角色
Bucket:槽位:鏈式環形存儲任務
Timer:主循環:推進時間輪、持有工作線程池(實際執行任務的)、主循環線程池
Timewheel: 時間輪:(持有父級時間輪引用),添加任務、時間Tick(間隔)、持有Bucket size,當前時間
TimeTask: 運行任務:作為Bucket 持有的任務引用、失效時間、取消狀態等,
- 創建DelayQueue, TimeWheel以及附屬(上級)時間輪持有一個queue
- 可以實現對時間輪中的槽位進行改造,實現槽內任務鏈式存儲、槽位本身實現Delayed接口,配合DelayQueue實現時間輪推進
- 每一個任務添加時,對應的槽位判斷是否添加到queue,如果時間超時超出當前時間輪范圍,則獲取上級時間輪(如果不存在)則創建。
- 時間輪推進則依靠queue.poll(timeout),如果有最近到期任務,就通過timewheel推進時間輪。
使用ScheduledThreadPoll
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println(new Date());
}, 10, 20, TimeUnit.SECONDS);