perf[scheduler]: 使用更加轻量级的实现替代比较重的时间轮算法

This commit is contained in:
jaysunxiao
2021-10-10 21:56:07 +08:00
parent 5a3fc698c8
commit 5cb953f4e8
8 changed files with 99 additions and 349 deletions
@@ -60,7 +60,7 @@ public class CronOrmPersister extends AbstractOrmPersister {
var delay = 0L;
try {
var now = TimeUtils.now();
var nextTimestamp = TimeUtils.getNextTimestampByCronExpression(cronExpression, now);
var nextTimestamp = TimeUtils.nextTimestampByCronExpression(cronExpression, now);
delay = nextTimestamp - now;
if (delay < 0) {
@@ -13,14 +13,17 @@
package com.zfoo.scheduler.manager;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.scheduler.SchedulerContext;
import com.zfoo.scheduler.model.vo.SchedulerDefinition;
import com.zfoo.scheduler.timeWheelUtils.Timer;
import com.zfoo.scheduler.timeWheelUtils.TimerTask;
import com.zfoo.scheduler.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,30 +36,94 @@ public abstract class SchedulerBus {
private static final Logger logger = LoggerFactory.getLogger(SchedulerBus.class);
private static Timer timer = new Timer();
public static final long TRIGGER_MILLIS_INTERVAL = TimeUtils.MILLIS_PER_SECOND;
private static final List<SchedulerDefinition> schedulerDefList = new CopyOnWriteArrayList<>();
/**
* scheduler默认只有一个单线程的线程池
*/
public static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1));
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1));
/**
* 上一次trigger触发时间
*/
private static long lastTriggerTimestamp = 0L;
public static final long TRIGGER_MILLIS_INTERVAL = TimeUtils.MILLIS_PER_SECOND;
/**
* 在scheduler中,最小的triggerTimestamp
*/
private static long minTriggerTimestamp = 0L;
static {
executor.scheduleAtFixedRate(() -> {
try {
timer.advanceClock(10);
triggerPerSecond();
} catch (Exception e) {
logger.error("scheduler triggers an error.", e);
}
}, 0, TRIGGER_MILLIS_INTERVAL, TimeUnit.MILLISECONDS);
}
public static void refreshMinTriggerTimestamp() {
var minTimestamp = Long.MAX_VALUE;
for (var scheduler : schedulerDefList) {
if (scheduler.getTriggerTimestamp() < minTimestamp) {
minTimestamp = scheduler.getTriggerTimestamp();
}
}
minTriggerTimestamp = minTimestamp;
}
/**
* 每一秒执行一次,如果这个任务执行时间过长超过,比如10秒,执行完成后,不会再执行10次
*/
private static void triggerPerSecond() {
var currentTimeMillis = TimeUtils.currentTimeMillis();
if (CollectionUtils.isEmpty(schedulerDefList)) {
return;
}
// 有人向前调整过机器时间,重新计算scheduler里的triggerTimestamp
// var diff = timestamp - lastTriggerTimestamp;
if (currentTimeMillis < lastTriggerTimestamp) {
for (SchedulerDefinition schedulerDef : schedulerDefList) {
var nextTriggerTimestamp = TimeUtils.nextTimestampByCronExpression(schedulerDef.getCronExpression(), currentTimeMillis);
schedulerDef.setTriggerTimestamp(nextTriggerTimestamp);
}
refreshMinTriggerTimestamp();
}
// diff > 0, 没有人调整时间或者有人向后调整过机器时间,可以忽略,因为向后调整时间时间戳一定会大于triggerTimestamp,所以一定会触发
lastTriggerTimestamp = currentTimeMillis;
// 如果minSchedulerTriggerTimestamp大于timestamp,说明没有可执行的scheduler
if (currentTimeMillis < minTriggerTimestamp) {
return;
}
var minTimestamp = Long.MAX_VALUE;
var timestampZonedDataTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(currentTimeMillis), TimeUtils.DEFAULT_ZONE_ID);
for (var scheduler : schedulerDefList) {
var triggerTimestamp = scheduler.getTriggerTimestamp();
if (triggerTimestamp <= currentTimeMillis) {
// 到达触发时间,则执行runnable方法
scheduler.getScheduler().invoke();
// 重新设置下一次的触发时间戳
triggerTimestamp = TimeUtils.nextTimestampByCronExpression(scheduler.getCronExpression(), timestampZonedDataTime);
scheduler.setTriggerTimestamp(triggerTimestamp);
}
if (triggerTimestamp < minTimestamp) {
minTimestamp = scheduler.getTriggerTimestamp();
}
}
minTriggerTimestamp = minTimestamp;
}
public static void registerScheduler(SchedulerDefinition scheduler) {
var timerTask = new TimerTask(scheduler.getTriggerTimestamp(), () -> {
scheduler.getScheduler().invoke();
refreshTask(scheduler);
});
timer.addTask(timerTask);
schedulerDefList.add(scheduler);
refreshMinTriggerTimestamp();
}
@@ -105,14 +172,14 @@ public abstract class SchedulerBus {
}, delay, unit);
}
public static void refreshTask(SchedulerDefinition schedulerDefinition) {
var timestamp = TimeUtils.currentTimeMillis();
var nextTriggerTimestamp = TimeUtils.getNextTimestampByCronExpression(schedulerDefinition.getCronExpression(), timestamp);
schedulerDefinition.setTriggerTimestamp(nextTriggerTimestamp);
var timerTask = new TimerTask(schedulerDefinition.getTriggerTimestamp(), () -> {
schedulerDefinition.getScheduler().invoke();
refreshTask(schedulerDefinition);
});
timer.addTask(timerTask);
/**
* cron表达式执行的任务
*/
public static void scheduleCron(Runnable runnable, String cron) {
if (SchedulerContext.isStop()) {
return;
}
schedulerDefList.add(SchedulerDefinition.valueOf(cron, runnable));
}
}
@@ -43,7 +43,7 @@ public class SchedulerDefinition {
schedulerDef.cronExpression = cronExpression;
// 字节码增强,避免反射
schedulerDef.scheduler = EnhanceUtils.createScheduler(ReflectScheduler.valueOf(bean, method));
schedulerDef.triggerTimestamp = TimeUtils.getNextTimestampByCronExpression(cronExpression, TimeUtils.currentTimeMillis());
schedulerDef.triggerTimestamp = TimeUtils.nextTimestampByCronExpression(cronExpression, TimeUtils.currentTimeMillis());
ReflectionUtils.makeAccessible(method);
return schedulerDef;
}
@@ -53,7 +53,7 @@ public class SchedulerDefinition {
var cronExpression = CronExpression.parse(cron);
schedulerDef.cronExpression = cronExpression;
schedulerDef.scheduler = RunnableScheduler.valueOf(runnable);
schedulerDef.triggerTimestamp = TimeUtils.getNextTimestampByCronExpression(cronExpression, TimeUtils.currentTimeMillis());
schedulerDef.triggerTimestamp = TimeUtils.nextTimestampByCronExpression(cronExpression, TimeUtils.currentTimeMillis());
return schedulerDef;
}
@@ -1,98 +0,0 @@
package com.zfoo.scheduler.timeWheelUtils;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
/**
* 时间槽
*/
public class Bucket implements Delayed {
/**
* 过期时间
*/
private AtomicLong expiration = new AtomicLong(-1L);
/**
* 傀儡节点
*/
private TimerTask root = new TimerTask(-1L, null);
{
root.pre = root;
root.next = root;
}
/**
* 设置过期时间
*/
public boolean setExpiration(long expire) {
return expiration.getAndSet(expire) != expire;
}
/**
* 获取过期时间
*/
public long getExpiration() {
return expiration.get();
}
/**
* 新增任务,加入链表尾部
*/
public void addTask(TimerTask timerTask) {
synchronized (this) {
if (timerTask.bucket == null) {
timerTask.bucket = this;
TimerTask tail = root.pre;
timerTask.next = root;
timerTask.pre = tail;
tail.next = timerTask;
root.pre = timerTask;
}
}
}
/**
* 移除任务
*/
public void removeTask(TimerTask timerTask) {
synchronized (this) {
if (timerTask.bucket.equals(this)) {
timerTask.next.pre = timerTask.pre;
timerTask.pre.next = timerTask.next;
timerTask.bucket = null;
timerTask.next = null;
timerTask.pre = null;
}
}
}
/**
* 重新分配
*/
public void flush(Consumer<TimerTask> flush) {
TimerTask timerTask = root.next;
while (!timerTask.equals(root)) {
this.removeTask(timerTask);
flush.accept(timerTask);
timerTask = root.next;
}
expiration.set(-1L);
}
@Override
public long getDelay(TimeUnit unit) {
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
@Override
public int compareTo(Delayed o) {
if (o instanceof Bucket) {
return Long.compare(expiration.get(), ((Bucket) o).expiration.get());
}
return 0;
}
}
@@ -1,112 +0,0 @@
package com.zfoo.scheduler.timeWheelUtils;
import java.util.concurrent.DelayQueue;
/**
* 时间轮
*/
public class TimeWheel {
/**
* 一个时间槽的范围
*/
private long tickMs;
/**
* 时间轮大小
*/
private int wheelSize;
/**
* 时间轮的范围
*/
private long interval;
/**
* 时间槽
*/
private Bucket[] buckets;
/**
* 当前时间
*/
private long currentTime;
/**
* 上层时间轮
*/
private volatile TimeWheel overflowWheel;
/**
* 一个Timer只有一个delayQueue
*/
private DelayQueue<Bucket> delayQueue;
public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<Bucket> delayQueue) {
this.currentTime = currentTime;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.buckets = new Bucket[wheelSize];
//currentTime为tickMs的整数倍 这里做取整操作
this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue;
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new Bucket();
}
}
/**
* 创建或者获取上层时间轮
*/
private TimeWheel getOverflowWheel() {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
}
}
}
return overflowWheel;
}
/**
* 添加任务到时间轮
*/
public boolean addTask(TimerTask timerTask) {
long expiration = timerTask.getDelayMs();
//过期任务直接执行
if (expiration < currentTime + tickMs) {
return false;
} else if (expiration < currentTime + interval) {
//当前时间轮可以容纳该任务 加入时间槽
Long bound = expiration / tickMs;
int bucketIndex = (int) (bound % wheelSize);
//System.out.println("tickMs:" + tickMs + "------bucketIndex:" + bucketIndex + "------expiration:" + expiration);
Bucket bucket = buckets[bucketIndex];
bucket.addTask(timerTask);
if (bucket.setExpiration(bound * tickMs)) {
//添加到delayQueue中
delayQueue.offer(bucket);
}
} else {
//放到上一层的时间轮
TimeWheel timeWheel = getOverflowWheel();
timeWheel.addTask(timerTask);
}
return true;
}
/**
* 推进时间
*/
public void advanceClock(long timestamp) {
if (timestamp >= currentTime + tickMs) {
currentTime = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
//推进上层时间轮时间
this.getOverflowWheel().advanceClock(timestamp);
}
}
}
}
@@ -1,61 +0,0 @@
package com.zfoo.scheduler.timeWheelUtils;
import com.zfoo.scheduler.manager.SchedulerThreadFactory;
import com.zfoo.scheduler.util.TimeUtils;
import java.util.concurrent.*;
import static com.zfoo.scheduler.manager.SchedulerBus.executor;
/**
* 定时器
*/
public class Timer {
/**
* 底层时间轮
*/
private TimeWheel timeWheel;
/**
* 一个Timer只有一个delayQueue
*/
private DelayQueue<Bucket> delayQueue = new DelayQueue<>();
/**
* 构造函数
*/
public Timer() {
timeWheel = new TimeWheel(1000, 20, TimeUtils.currentTimeMillis(), delayQueue);
}
/**
* 添加任务
*/
public void addTask(TimerTask timerTask) {
//添加失败任务直接执行
if (!timeWheel.addTask(timerTask)) {
executor.submit(timerTask.getTask());
}
}
/**
* 获取过期任务
*/
public void advanceClock(long timestamp) {
try {
//阻塞获取队头元素
Bucket bucket = delayQueue.poll(timestamp, TimeUnit.MILLISECONDS);
if (bucket != null) {
//推进时间
timeWheel.advanceClock(bucket.getExpiration());
//执行过期任务(包含降级操作)
bucket.flush(this::addTask);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@@ -1,51 +0,0 @@
package com.zfoo.scheduler.timeWheelUtils;
/**
* 任务
*/
public class TimerTask {
/**
* 延迟时间
*/
private long delayMs;
/**
* 任务
*/
private Runnable task;
/**
* 时间槽
*/
protected Bucket bucket;
/**
* 下一个节点
*/
protected TimerTask next;
/**
* 上一个节点
*/
protected TimerTask pre;
public TimerTask(long delayMs, Runnable task) {
this.delayMs = delayMs;
this.task = task;
this.bucket = null;
this.next = null;
this.pre = null;
}
public Runnable getTask() {
return task;
}
public long getDelayMs() {
return delayMs;
}
}
@@ -82,7 +82,7 @@ public abstract class TimeUtils {
static {
currentTimeMillis();
// 调用一下静态方法,使SchedulerBus静态代码块初始化
//SchedulerBus.refreshMinTriggerTimestamp();
SchedulerBus.refreshMinTriggerTimestamp();
}
/**
@@ -364,8 +364,13 @@ public abstract class TimeUtils {
}
// --------------------------------------cron表达式--------------------------------------
public static long getNextTimestampByCronExpression(CronExpression expression, long currentTimestamp) {
var next = expression.next(ZonedDateTime.ofInstant(Instant.ofEpochMilli(currentTimestamp), DEFAULT_ZONE_ID));
public static long nextTimestampByCronExpression(CronExpression expression, long currentTimestamp) {
var zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(currentTimestamp), DEFAULT_ZONE_ID);
return nextTimestampByCronExpression(expression, zonedDateTime);
}
public static long nextTimestampByCronExpression(CronExpression expression, ZonedDateTime zonedDateTime) {
var next = expression.next(zonedDateTime);
if (next == null) {
return Long.MAX_VALUE;