From 5cb953f4e8afda109d8c9cb0c8c788a1d70ef4ff Mon Sep 17 00:00:00 2001 From: jaysunxiao Date: Sun, 10 Oct 2021 21:56:07 +0800 Subject: [PATCH] =?UTF-8?q?perf[scheduler]:=20=E4=BD=BF=E7=94=A8=E6=9B=B4?= =?UTF-8?q?=E5=8A=A0=E8=BD=BB=E9=87=8F=E7=BA=A7=E7=9A=84=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E6=9B=BF=E4=BB=A3=E6=AF=94=E8=BE=83=E9=87=8D=E7=9A=84=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E8=BD=AE=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../orm/model/persister/CronOrmPersister.java | 2 +- .../zfoo/scheduler/manager/SchedulerBus.java | 109 +++++++++++++---- .../model/vo/SchedulerDefinition.java | 4 +- .../zfoo/scheduler/timeWheelUtils/Bucket.java | 98 --------------- .../scheduler/timeWheelUtils/TimeWheel.java | 112 ------------------ .../zfoo/scheduler/timeWheelUtils/Timer.java | 61 ---------- .../scheduler/timeWheelUtils/TimerTask.java | 51 -------- .../com/zfoo/scheduler/util/TimeUtils.java | 11 +- 8 files changed, 99 insertions(+), 349 deletions(-) delete mode 100644 scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Bucket.java delete mode 100644 scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimeWheel.java delete mode 100644 scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Timer.java delete mode 100644 scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimerTask.java diff --git a/orm/src/main/java/com/zfoo/orm/model/persister/CronOrmPersister.java b/orm/src/main/java/com/zfoo/orm/model/persister/CronOrmPersister.java index 18078eed..679cc0f6 100644 --- a/orm/src/main/java/com/zfoo/orm/model/persister/CronOrmPersister.java +++ b/orm/src/main/java/com/zfoo/orm/model/persister/CronOrmPersister.java @@ -60,7 +60,7 @@ public class CronOrmPersister extends AbstractOrmPersister { var delay = 0L; try { var now = TimeUtils.now(); - var nextTimestamp = TimeUtils.getNextTimestampByCronExpression(cronExpression, now); + var nextTimestamp = TimeUtils.nextTimestampByCronExpression(cronExpression, now); delay = nextTimestamp - now; if (delay < 0) { diff --git a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java index 04f5706c..4b7a28c9 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java @@ -13,14 +13,17 @@ package com.zfoo.scheduler.manager; +import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.scheduler.SchedulerContext; import com.zfoo.scheduler.model.vo.SchedulerDefinition; -import com.zfoo.scheduler.timeWheelUtils.Timer; -import com.zfoo.scheduler.timeWheelUtils.TimerTask; import com.zfoo.scheduler.util.TimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -33,30 +36,94 @@ public abstract class SchedulerBus { private static final Logger logger = LoggerFactory.getLogger(SchedulerBus.class); - private static Timer timer = new Timer(); - - public static final long TRIGGER_MILLIS_INTERVAL = TimeUtils.MILLIS_PER_SECOND; + private static final List schedulerDefList = new CopyOnWriteArrayList<>(); /** * scheduler默认只有一个单线程的线程池 */ - public static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1)); + private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1)); + /** + * 上一次trigger触发时间 + */ + private static long lastTriggerTimestamp = 0L; + + + public static final long TRIGGER_MILLIS_INTERVAL = TimeUtils.MILLIS_PER_SECOND; + /** + * 在scheduler中,最小的triggerTimestamp + */ + private static long minTriggerTimestamp = 0L; static { executor.scheduleAtFixedRate(() -> { try { - timer.advanceClock(10); + triggerPerSecond(); } catch (Exception e) { logger.error("scheduler triggers an error.", e); } }, 0, TRIGGER_MILLIS_INTERVAL, TimeUnit.MILLISECONDS); } + + public static void refreshMinTriggerTimestamp() { + var minTimestamp = Long.MAX_VALUE; + for (var scheduler : schedulerDefList) { + if (scheduler.getTriggerTimestamp() < minTimestamp) { + minTimestamp = scheduler.getTriggerTimestamp(); + } + } + minTriggerTimestamp = minTimestamp; + } + + /** + * 每一秒执行一次,如果这个任务执行时间过长超过,比如10秒,执行完成后,不会再执行10次 + */ + private static void triggerPerSecond() { + var currentTimeMillis = TimeUtils.currentTimeMillis(); + + if (CollectionUtils.isEmpty(schedulerDefList)) { + return; + } + + + // 有人向前调整过机器时间,重新计算scheduler里的triggerTimestamp + // var diff = timestamp - lastTriggerTimestamp; + if (currentTimeMillis < lastTriggerTimestamp) { + for (SchedulerDefinition schedulerDef : schedulerDefList) { + var nextTriggerTimestamp = TimeUtils.nextTimestampByCronExpression(schedulerDef.getCronExpression(), currentTimeMillis); + schedulerDef.setTriggerTimestamp(nextTriggerTimestamp); + } + refreshMinTriggerTimestamp(); + } + + // diff > 0, 没有人调整时间或者有人向后调整过机器时间,可以忽略,因为向后调整时间时间戳一定会大于triggerTimestamp,所以一定会触发 + lastTriggerTimestamp = currentTimeMillis; + + // 如果minSchedulerTriggerTimestamp大于timestamp,说明没有可执行的scheduler + if (currentTimeMillis < minTriggerTimestamp) { + return; + } + + var minTimestamp = Long.MAX_VALUE; + var timestampZonedDataTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(currentTimeMillis), TimeUtils.DEFAULT_ZONE_ID); + for (var scheduler : schedulerDefList) { + var triggerTimestamp = scheduler.getTriggerTimestamp(); + if (triggerTimestamp <= currentTimeMillis) { + // 到达触发时间,则执行runnable方法 + scheduler.getScheduler().invoke(); + // 重新设置下一次的触发时间戳 + triggerTimestamp = TimeUtils.nextTimestampByCronExpression(scheduler.getCronExpression(), timestampZonedDataTime); + scheduler.setTriggerTimestamp(triggerTimestamp); + } + if (triggerTimestamp < minTimestamp) { + minTimestamp = scheduler.getTriggerTimestamp(); + } + } + minTriggerTimestamp = minTimestamp; + } + public static void registerScheduler(SchedulerDefinition scheduler) { - var timerTask = new TimerTask(scheduler.getTriggerTimestamp(), () -> { - scheduler.getScheduler().invoke(); - refreshTask(scheduler); - }); - timer.addTask(timerTask); + schedulerDefList.add(scheduler); + refreshMinTriggerTimestamp(); } @@ -105,14 +172,14 @@ public abstract class SchedulerBus { }, delay, unit); } - public static void refreshTask(SchedulerDefinition schedulerDefinition) { - var timestamp = TimeUtils.currentTimeMillis(); - var nextTriggerTimestamp = TimeUtils.getNextTimestampByCronExpression(schedulerDefinition.getCronExpression(), timestamp); - schedulerDefinition.setTriggerTimestamp(nextTriggerTimestamp); - var timerTask = new TimerTask(schedulerDefinition.getTriggerTimestamp(), () -> { - schedulerDefinition.getScheduler().invoke(); - refreshTask(schedulerDefinition); - }); - timer.addTask(timerTask); + /** + * cron表达式执行的任务 + */ + public static void scheduleCron(Runnable runnable, String cron) { + if (SchedulerContext.isStop()) { + return; + } + + schedulerDefList.add(SchedulerDefinition.valueOf(cron, runnable)); } } diff --git a/scheduler/src/main/java/com/zfoo/scheduler/model/vo/SchedulerDefinition.java b/scheduler/src/main/java/com/zfoo/scheduler/model/vo/SchedulerDefinition.java index 9013c9e8..ca07d211 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/model/vo/SchedulerDefinition.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/model/vo/SchedulerDefinition.java @@ -43,7 +43,7 @@ public class SchedulerDefinition { schedulerDef.cronExpression = cronExpression; // 字节码增强,避免反射 schedulerDef.scheduler = EnhanceUtils.createScheduler(ReflectScheduler.valueOf(bean, method)); - schedulerDef.triggerTimestamp = TimeUtils.getNextTimestampByCronExpression(cronExpression, TimeUtils.currentTimeMillis()); + schedulerDef.triggerTimestamp = TimeUtils.nextTimestampByCronExpression(cronExpression, TimeUtils.currentTimeMillis()); ReflectionUtils.makeAccessible(method); return schedulerDef; } @@ -53,7 +53,7 @@ public class SchedulerDefinition { var cronExpression = CronExpression.parse(cron); schedulerDef.cronExpression = cronExpression; schedulerDef.scheduler = RunnableScheduler.valueOf(runnable); - schedulerDef.triggerTimestamp = TimeUtils.getNextTimestampByCronExpression(cronExpression, TimeUtils.currentTimeMillis()); + schedulerDef.triggerTimestamp = TimeUtils.nextTimestampByCronExpression(cronExpression, TimeUtils.currentTimeMillis()); return schedulerDef; } diff --git a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Bucket.java b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Bucket.java deleted file mode 100644 index f41cf580..00000000 --- a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Bucket.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.zfoo.scheduler.timeWheelUtils; - -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; - -/** - * 时间槽 - */ -public class Bucket implements Delayed { - - /** - * 过期时间 - */ - private AtomicLong expiration = new AtomicLong(-1L); - - /** - * 傀儡节点 - */ - private TimerTask root = new TimerTask(-1L, null); - - { - root.pre = root; - root.next = root; - } - - /** - * 设置过期时间 - */ - public boolean setExpiration(long expire) { - return expiration.getAndSet(expire) != expire; - } - - /** - * 获取过期时间 - */ - public long getExpiration() { - return expiration.get(); - } - - /** - * 新增任务,加入链表尾部 - */ - public void addTask(TimerTask timerTask) { - synchronized (this) { - if (timerTask.bucket == null) { - timerTask.bucket = this; - TimerTask tail = root.pre; - timerTask.next = root; - timerTask.pre = tail; - tail.next = timerTask; - root.pre = timerTask; - } - } - } - - /** - * 移除任务 - */ - public void removeTask(TimerTask timerTask) { - synchronized (this) { - if (timerTask.bucket.equals(this)) { - timerTask.next.pre = timerTask.pre; - timerTask.pre.next = timerTask.next; - timerTask.bucket = null; - timerTask.next = null; - timerTask.pre = null; - } - } - } - - /** - * 重新分配 - */ - public void flush(Consumer flush) { - TimerTask timerTask = root.next; - while (!timerTask.equals(root)) { - this.removeTask(timerTask); - flush.accept(timerTask); - timerTask = root.next; - } - expiration.set(-1L); - } - - @Override - public long getDelay(TimeUnit unit) { - return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); - } - - @Override - public int compareTo(Delayed o) { - if (o instanceof Bucket) { - return Long.compare(expiration.get(), ((Bucket) o).expiration.get()); - } - return 0; - } -} diff --git a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimeWheel.java b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimeWheel.java deleted file mode 100644 index 91b94b7b..00000000 --- a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimeWheel.java +++ /dev/null @@ -1,112 +0,0 @@ -package com.zfoo.scheduler.timeWheelUtils; - -import java.util.concurrent.DelayQueue; - -/** - * 时间轮 - */ -public class TimeWheel { - - /** - * 一个时间槽的范围 - */ - private long tickMs; - - /** - * 时间轮大小 - */ - private int wheelSize; - - /** - * 时间轮的范围 - */ - private long interval; - - /** - * 时间槽 - */ - private Bucket[] buckets; - - /** - * 当前时间 - */ - private long currentTime; - - /** - * 上层时间轮 - */ - private volatile TimeWheel overflowWheel; - - /** - * 一个Timer只有一个delayQueue - */ - private DelayQueue delayQueue; - - public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue delayQueue) { - this.currentTime = currentTime; - this.tickMs = tickMs; - this.wheelSize = wheelSize; - this.interval = tickMs * wheelSize; - this.buckets = new Bucket[wheelSize]; - //currentTime为tickMs的整数倍 这里做取整操作 - this.currentTime = currentTime - (currentTime % tickMs); - this.delayQueue = delayQueue; - for (int i = 0; i < wheelSize; i++) { - buckets[i] = new Bucket(); - } - } - - /** - * 创建或者获取上层时间轮 - */ - private TimeWheel getOverflowWheel() { - if (overflowWheel == null) { - synchronized (this) { - if (overflowWheel == null) { - overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue); - } - } - } - return overflowWheel; - } - - /** - * 添加任务到时间轮 - */ - public boolean addTask(TimerTask timerTask) { - long expiration = timerTask.getDelayMs(); - //过期任务直接执行 - if (expiration < currentTime + tickMs) { - return false; - } else if (expiration < currentTime + interval) { - //当前时间轮可以容纳该任务 加入时间槽 - Long bound = expiration / tickMs; - int bucketIndex = (int) (bound % wheelSize); - //System.out.println("tickMs:" + tickMs + "------bucketIndex:" + bucketIndex + "------expiration:" + expiration); - Bucket bucket = buckets[bucketIndex]; - bucket.addTask(timerTask); - if (bucket.setExpiration(bound * tickMs)) { - //添加到delayQueue中 - delayQueue.offer(bucket); - } - } else { - //放到上一层的时间轮 - TimeWheel timeWheel = getOverflowWheel(); - timeWheel.addTask(timerTask); - } - return true; - } - - /** - * 推进时间 - */ - public void advanceClock(long timestamp) { - if (timestamp >= currentTime + tickMs) { - currentTime = timestamp - (timestamp % tickMs); - if (overflowWheel != null) { - //推进上层时间轮时间 - this.getOverflowWheel().advanceClock(timestamp); - } - } - } -} diff --git a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Timer.java b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Timer.java deleted file mode 100644 index 3a268dc9..00000000 --- a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Timer.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.zfoo.scheduler.timeWheelUtils; - -import com.zfoo.scheduler.manager.SchedulerThreadFactory; -import com.zfoo.scheduler.util.TimeUtils; - -import java.util.concurrent.*; - -import static com.zfoo.scheduler.manager.SchedulerBus.executor; - -/** - * 定时器 - */ -public class Timer { - - /** - * 底层时间轮 - */ - private TimeWheel timeWheel; - - /** - * 一个Timer只有一个delayQueue - */ - private DelayQueue delayQueue = new DelayQueue<>(); - - - /** - * 构造函数 - */ - public Timer() { - timeWheel = new TimeWheel(1000, 20, TimeUtils.currentTimeMillis(), delayQueue); - } - - /** - * 添加任务 - */ - public void addTask(TimerTask timerTask) { - //添加失败任务直接执行 - if (!timeWheel.addTask(timerTask)) { - executor.submit(timerTask.getTask()); - } - } - - /** - * 获取过期任务 - */ - public void advanceClock(long timestamp) { - try { - //阻塞获取队头元素 - Bucket bucket = delayQueue.poll(timestamp, TimeUnit.MILLISECONDS); - - if (bucket != null) { - //推进时间 - timeWheel.advanceClock(bucket.getExpiration()); - //执行过期任务(包含降级操作) - bucket.flush(this::addTask); - } - } catch (Exception e) { - e.printStackTrace(); - } - } -} diff --git a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimerTask.java b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimerTask.java deleted file mode 100644 index 79794efb..00000000 --- a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimerTask.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.zfoo.scheduler.timeWheelUtils; - -/** - * 任务 - */ -public class TimerTask { - - /** - * 延迟时间 - */ - private long delayMs; - - /** - * 任务 - */ - private Runnable task; - - /** - * 时间槽 - */ - protected Bucket bucket; - - /** - * 下一个节点 - */ - protected TimerTask next; - - /** - * 上一个节点 - */ - protected TimerTask pre; - - - public TimerTask(long delayMs, Runnable task) { - this.delayMs = delayMs; - this.task = task; - this.bucket = null; - this.next = null; - this.pre = null; - } - - public Runnable getTask() { - return task; - } - - public long getDelayMs() { - return delayMs; - } - - -} diff --git a/scheduler/src/main/java/com/zfoo/scheduler/util/TimeUtils.java b/scheduler/src/main/java/com/zfoo/scheduler/util/TimeUtils.java index 32c25bfe..4f1b4af4 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/util/TimeUtils.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/util/TimeUtils.java @@ -82,7 +82,7 @@ public abstract class TimeUtils { static { currentTimeMillis(); // 调用一下静态方法,使SchedulerBus静态代码块初始化 - //SchedulerBus.refreshMinTriggerTimestamp(); + SchedulerBus.refreshMinTriggerTimestamp(); } /** @@ -364,8 +364,13 @@ public abstract class TimeUtils { } // --------------------------------------cron表达式-------------------------------------- - public static long getNextTimestampByCronExpression(CronExpression expression, long currentTimestamp) { - var next = expression.next(ZonedDateTime.ofInstant(Instant.ofEpochMilli(currentTimestamp), DEFAULT_ZONE_ID)); + public static long nextTimestampByCronExpression(CronExpression expression, long currentTimestamp) { + var zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(currentTimestamp), DEFAULT_ZONE_ID); + return nextTimestampByCronExpression(expression, zonedDateTime); + } + + public static long nextTimestampByCronExpression(CronExpression expression, ZonedDateTime zonedDateTime) { + var next = expression.next(zonedDateTime); if (next == null) { return Long.MAX_VALUE;