fix[scheduler]:线程池优化

This commit is contained in:
islandempty
2021-10-10 14:46:12 +08:00
parent b31186c8d3
commit 9ea2a5cbbc
4 changed files with 18 additions and 27 deletions
@@ -35,11 +35,21 @@ public abstract class SchedulerBus {
private static Timer timer = new Timer();
public static final long TRIGGER_MILLIS_INTERVAL = TimeUtils.MILLIS_PER_SECOND;
/**
* scheduler默认只有一个单线程的线程池
*/
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1));
public static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1));
static {
executor.scheduleAtFixedRate(() -> {
try {
timer.advanceClock(1);
} catch (Exception e) {
logger.error("scheduler triggers an error.", e);
}
}, 0, TRIGGER_MILLIS_INTERVAL, TimeUnit.MILLISECONDS);
}
public static void registerScheduler(SchedulerDefinition scheduler) {
var timerTask = new TimerTask(scheduler.getTriggerTimestamp(), () -> {
@@ -73,7 +73,7 @@ public class Bucket implements Delayed {
/**
* 重新分配
*/
public synchronized void flush(Consumer<TimerTask> flush) {
public void flush(Consumer<TimerTask> flush) {
TimerTask timerTask = root.next;
while (!timerTask.equals(root)) {
this.removeTask(timerTask);
@@ -77,7 +77,6 @@ public class TimeWheel {
long expiration = timerTask.getDelayMs();
//过期任务直接执行
if (expiration < currentTime + tickMs) {
//TODO这里可以直接执行定时任务
return false;
} else if (expiration < currentTime + interval) {
//当前时间轮可以容纳该任务 加入时间槽
@@ -1,11 +1,11 @@
package com.zfoo.scheduler.timeWheelUtils;
import com.zfoo.scheduler.manager.SchedulerThreadFactory;
import com.zfoo.scheduler.util.TimeUtils;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import static com.zfoo.scheduler.manager.SchedulerBus.executor;
/**
* 定时器
@@ -22,30 +22,12 @@ public class Timer {
*/
private DelayQueue<Bucket> delayQueue = new DelayQueue<>();
/**
* 过期任务执行线程
*/
private ExecutorService workerThreadPool;
/**
* 轮询delayQueue获取过期任务线程
*/
private ExecutorService bossThreadPool;
/**
* 构造函数
*/
public Timer() {
timeWheel = new TimeWheel(1000, 20, TimeUtils.currentTimeMillis(), delayQueue);
bossThreadPool = Executors.newFixedThreadPool(1);
workerThreadPool = Executors.newFixedThreadPool(10);
//20ms获取一次过期任务
bossThreadPool.submit(() -> {
while (true) {
this.advanceClock(20);
}
});
}
/**
@@ -54,14 +36,14 @@ public class Timer {
public void addTask(TimerTask timerTask) {
//添加失败任务直接执行
if (!timeWheel.addTask(timerTask)) {
workerThreadPool.submit(timerTask.getTask());
executor.submit(timerTask.getTask());
}
}
/**
* 获取过期任务
*/
private void advanceClock(long timestamp) {
public void advanceClock(long timestamp) {
try {
//阻塞获取队头元素
Bucket bucket = delayQueue.poll(timestamp, TimeUnit.MILLISECONDS);