1、用于线程任务调度,当同一个任务时遵循队列形式执行,根据不同业务场景
自定等待任务waitForNextTick(),调起业务逻辑notifyExpired()
任务执行完成进入线程等待run()
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class TimingWheel<E> {
//控制任务周期 TimeUnit控制时间单位
private final long tickDuration;
//任务队列
private Queue<Slot<E>> wheel ;
//调取业务逻辑
private final CopyOnWriteArrayList<ExpirationListener<E>> expirationListeners = new CopyOnWriteArrayList<ExpirationListener<E>>();
//控制任务唯一性
private final Map<E, Slot<E>> indicator = new ConcurrentHashMap<>();
//标识线程是否中断
private final AtomicBoolean shutdown = new AtomicBoolean(false);
//执行任务线程
private Thread workerThread;
// ~ -------------------------------------------------------------------------------------------------------------
/**
* 初始化定时
* @param tickDuration
* @param timeUnit
*/
public TimingWheel(int tickDuration, TimeUnit timeUnit,int room) {
if (timeUnit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
this.wheel = new LinkedList<>();
this.tickDuration = TimeUnit.MILLISECONDS.convert(tickDuration, timeUnit);
workerThread = new Thread(new TickWorker(), "Timing-Wheel:"+room);
}
public void start() {
if (shutdown.get()) {
throw new IllegalStateException("Cannot be started once stopped");
}
if (!workerThread.isAlive()) {
workerThread.start();
}
}
public boolean stop() {
if (!shutdown.compareAndSet(false, true)) {
return false;
}
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return true;
}
public void addExpirationListener(ExpirationListener<E> listener) {
expirationListeners.add(listener);
}
public void removeExpirationListener(ExpirationListener<E> listener) {
expirationListeners.remove(listener);
}
public long add(E e) {
synchronized(e) {
checkAdd(e);
Slot<E> slot = new Slot<>(System.currentTimeMillis());
slot.add(e);
wheel.offer(slot);
indicator.put(e, slot);
return tickDuration;
}
}
public long getSize() {
return indicator.size();
}
private void checkAdd(E e) {
Slot<E> slot = indicator.get(e);
if (slot != null) {
slot.remove(e);
}
}
public boolean remove(E e) {
synchronized (e) {
Slot<E> slot = indicator.get(e);
if (slot == null) {
return false;
}
indicator.remove(e);
return slot.remove(e) != null;
}
}
//
private void notifyExpired() {
Slot<E> slot = wheel.poll();
Set<E> elements = slot.elements();
if(elements.isEmpty())
{
return;
}
for (E e : elements) {
synchronized (e) {
Slot<E> latestSlot = indicator.get(e);
if (latestSlot.equals(slot)) {
indicator.remove(e);
}
}
for (ExpirationListener<E> listener : expirationListeners) {
listener.expired(e);
}
}
}
private void callExpired() {
Slot<E> slot = wheel.element();
Set<E> elements = slot.elements();
if(elements.isEmpty())
{
return;
}
for (E e : elements) {
for (ExpirationListener<E> listener : expirationListeners) {
listener.expired(e);
}
}
}
// ~ -------------------------------------------------------------------------------------------------------------
private class TickWorker implements Runnable {
private long startTime;
private long tick;
@Override
public void run() {
startTime = System.currentTimeMillis();
tick = 1;
for (int i = 0; !shutdown.get(); i++) {
while (wheel.isEmpty())
{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
continue;
}
}
//等待下一个任务
waitForNextTick();
//执行任务
notifyExpired();
}
}
private void waitForNextTick() {
Slot<E> slot = wheel.element();
//初始等待时间
long currentWaitTime = slot.currentAddTime == 0 ? System.currentTimeMillis() : slot.currentAddTime;
for (;;) {
long currentTime = System.currentTimeMillis();
//从添加任务开始算等待时间
long lastTime = tickDuration - (currentTime - currentWaitTime);
//long sleepTime = tickDuration - (tickDuration - 1000L);
if (lastTime <= 0) {
break;
}
try {
Thread.sleep(1000l);
callExpired();
} catch (InterruptedException e) {
return;
}
}
tick++;
}
}
private static class Slot<E> {
private int id;
private long currentAddTime;
private Map<E, E> elements = new ConcurrentHashMap<E, E>();
public Slot(long currentAddTime) {
this.currentAddTime = currentAddTime;
}
public void add(E e) {
elements.put(e, e);
}
public E remove(E e) {
return elements.remove(e);
}
public Set<E> elements() {
return elements.keySet();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + id;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
@SuppressWarnings("rawtypes")
Slot other = (Slot) obj;
if (id != other.id)
return false;
return true;
}
@Override
public String toString() {
return "Slot [id=" + id + ", elements=" + elements + "]";
}
}
}
