diff --git a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java index 416c93e2..ea876af8 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java @@ -35,11 +35,21 @@ public abstract class SchedulerBus { private static Timer timer = new Timer(); + public static final long TRIGGER_MILLIS_INTERVAL = TimeUtils.MILLIS_PER_SECOND; /** * scheduler默认只有一个单线程的线程池 */ - private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1)); + public static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1)); + static { + executor.scheduleAtFixedRate(() -> { + try { + timer.advanceClock(1); + } catch (Exception e) { + logger.error("scheduler triggers an error.", e); + } + }, 0, TRIGGER_MILLIS_INTERVAL, TimeUnit.MILLISECONDS); + } public static void registerScheduler(SchedulerDefinition scheduler) { var timerTask = new TimerTask(scheduler.getTriggerTimestamp(), () -> { diff --git a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Bucket.java b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Bucket.java index b3aaf474..f41cf580 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Bucket.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Bucket.java @@ -73,7 +73,7 @@ public class Bucket implements Delayed { /** * 重新分配 */ - public synchronized void flush(Consumer flush) { + public void flush(Consumer flush) { TimerTask timerTask = root.next; while (!timerTask.equals(root)) { this.removeTask(timerTask); diff --git a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimeWheel.java b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimeWheel.java index a18e9477..91b94b7b 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimeWheel.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/TimeWheel.java @@ -77,7 +77,6 @@ public class TimeWheel { long expiration = timerTask.getDelayMs(); //过期任务直接执行 if (expiration < currentTime + tickMs) { - //TODO这里可以直接执行定时任务 return false; } else if (expiration < currentTime + interval) { //当前时间轮可以容纳该任务 加入时间槽 diff --git a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Timer.java b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Timer.java index 290914e8..3a268dc9 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Timer.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/timeWheelUtils/Timer.java @@ -1,11 +1,11 @@ package com.zfoo.scheduler.timeWheelUtils; +import com.zfoo.scheduler.manager.SchedulerThreadFactory; import com.zfoo.scheduler.util.TimeUtils; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; + +import static com.zfoo.scheduler.manager.SchedulerBus.executor; /** * 定时器 @@ -22,30 +22,12 @@ public class Timer { */ private DelayQueue delayQueue = new DelayQueue<>(); - /** - * 过期任务执行线程 - */ - private ExecutorService workerThreadPool; - - /** - * 轮询delayQueue获取过期任务线程 - */ - private ExecutorService bossThreadPool; /** * 构造函数 */ public Timer() { timeWheel = new TimeWheel(1000, 20, TimeUtils.currentTimeMillis(), delayQueue); - bossThreadPool = Executors.newFixedThreadPool(1); - workerThreadPool = Executors.newFixedThreadPool(10); - - //20ms获取一次过期任务 - bossThreadPool.submit(() -> { - while (true) { - this.advanceClock(20); - } - }); } /** @@ -54,14 +36,14 @@ public class Timer { public void addTask(TimerTask timerTask) { //添加失败任务直接执行 if (!timeWheel.addTask(timerTask)) { - workerThreadPool.submit(timerTask.getTask()); + executor.submit(timerTask.getTask()); } } /** * 获取过期任务 */ - private void advanceClock(long timestamp) { + public void advanceClock(long timestamp) { try { //阻塞获取队头元素 Bucket bucket = delayQueue.poll(timestamp, TimeUnit.MILLISECONDS);