perf[scheduler]: 让cron表达式在其它模块启动完成过后再执行,减少重复的遍历提升了一点性能

This commit is contained in:
jaysunxiao
2021-09-18 10:51:45 +08:00
parent d72fcbd5de
commit af463c656c
6 changed files with 81 additions and 124 deletions
@@ -13,7 +13,6 @@
package com.zfoo.boot;
import com.zfoo.scheduler.SchedulerContext;
import com.zfoo.scheduler.schema.SchedulerRegisterProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -25,12 +24,6 @@ import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class SchedulerAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public SchedulerRegisterProcessor schedulerRegisterProcessor() {
return new SchedulerRegisterProcessor();
}
@Bean
@ConditionalOnMissingBean
public SchedulerContext schedulerContext() {
@@ -13,8 +13,12 @@
package com.zfoo.scheduler;
import com.zfoo.protocol.collection.ArrayUtils;
import com.zfoo.protocol.util.ReflectionUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.scheduler.manager.SchedulerBus;
import com.zfoo.scheduler.model.anno.Scheduler;
import com.zfoo.scheduler.model.vo.SchedulerDefinition;
import com.zfoo.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,6 +30,7 @@ import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.Ordered;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.concurrent.ScheduledExecutorService;
/**
@@ -83,11 +88,61 @@ public class SchedulerContext implements ApplicationListener<ApplicationContextE
// 初始化上下文
SchedulerContext.instance = this;
instance.applicationContext = event.getApplicationContext();
inject();
} else if (event instanceof ContextClosedEvent) {
shutdown();
}
}
public void inject() {
var beanNames = applicationContext.getBeanDefinitionNames();
for (var beanName : beanNames) {
var bean = applicationContext.getBean(beanName);
var clazz = bean.getClass();
var methods = ReflectionUtils.getMethodsByAnnoInPOJOClass(bean.getClass(), Scheduler.class);
if (ArrayUtils.isEmpty(methods)) {
continue;
}
if (!ReflectionUtils.isPojoClass(clazz)) {
logger.warn("调度注册类[{}]不是POJO类,父类的调度不会被扫描到", clazz);
}
try {
for (var method : methods) {
var schedulerMethod = method.getAnnotation(Scheduler.class);
var paramClazzs = method.getParameterTypes();
if (paramClazzs.length >= 1) {
throw new IllegalArgumentException(StringUtils.format("[class:{}] [method:{}] can not have any parameters", bean.getClass(), method.getName()));
}
var methodName = method.getName();
if (!Modifier.isPublic(method.getModifiers())) {
throw new IllegalArgumentException(StringUtils.format("[class:{}] [method:{}] must use 'public' as modifier!", bean.getClass().getName(), methodName));
}
if (Modifier.isStatic(method.getModifiers())) {
throw new IllegalArgumentException(StringUtils.format("[class:{}] [method:{}] can not use 'static' as modifier!", bean.getClass().getName(), methodName));
}
if (!methodName.startsWith("cron")) {
throw new IllegalArgumentException(StringUtils.format("[class:{}] [method:{}] must start with 'cron' as method name!"
, bean.getClass().getName(), methodName));
}
var scheduler = SchedulerDefinition.valueOf(schedulerMethod.cron(), bean, method);
SchedulerBus.registerScheduler(scheduler);
}
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
@@ -14,14 +14,12 @@
package com.zfoo.scheduler.manager;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.scheduler.SchedulerContext;
import com.zfoo.scheduler.model.vo.SchedulerDefinition;
import com.zfoo.scheduler.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
@@ -41,13 +39,13 @@ public abstract class SchedulerBus {
/**
* 上一次trigger触发时间
*/
private static long lastTriggerTimestamp = 0;
private static long lastTriggerTimestamp = 0L;
/**
* 在scheduler中,最小的triggerTimestamp
*/
private static long minSchedulerTriggerTimestamp = 0;
private static long minTriggerTimestamp = 0L;
public static final long TRIGGER_MILLIS_INTERVAL = TimeUtils.MILLIS_PER_SECOND;
@@ -65,18 +63,18 @@ public abstract class SchedulerBus {
} catch (Exception e) {
logger.error("scheduler triggers an error.", e);
}
}, 7 * TimeUtils.MILLIS_PER_SECOND, TRIGGER_MILLIS_INTERVAL, TimeUnit.MILLISECONDS);
}, 0, TRIGGER_MILLIS_INTERVAL, TimeUnit.MILLISECONDS);
}
private static long minSchedulerTriggerTimestamp() {
var minSchedulerOptional = schedulerDefList.stream().min(Comparator.comparingLong(schedulerDef -> schedulerDef.getTriggerTimestamp()));
if (minSchedulerOptional.isPresent()) {
return minSchedulerOptional.get().getTriggerTimestamp();
} else {
logger.error("schedulerDefList:[{}] has no minSchedulerTriggerTimestamp to return. ", JsonUtils.object2String(schedulerDefList));
return 0;
public static void refreshMinTriggerTimestamp() {
var minTimestamp = Long.MAX_VALUE;
for (var scheduler : schedulerDefList) {
if (scheduler.getTriggerTimestamp() < minTimestamp) {
minTimestamp = scheduler.getTriggerTimestamp();
}
}
minTriggerTimestamp = minTimestamp;
}
/**
@@ -97,32 +95,36 @@ public abstract class SchedulerBus {
var nextTriggerTimestamp = TimeUtils.getNextTimestampByCronExpression(schedulerDef.getCronExpression(), timestamp);
schedulerDef.setTriggerTimestamp(nextTriggerTimestamp);
}
minSchedulerTriggerTimestamp = minSchedulerTriggerTimestamp();
refreshMinTriggerTimestamp();
}
// diff > 0, 没有人调整时间或者有人向后调整过机器时间,可以忽略,因为向后调整时间时间戳一定会大于triggerTimestamp,所以一定会触发
lastTriggerTimestamp = timestamp;
// 如果minSchedulerTriggerTimestamp大于timestamp,说明没有可执行的scheduler
if (timestamp < minSchedulerTriggerTimestamp) {
if (timestamp < minTriggerTimestamp) {
return;
}
for (var schedulerDef : schedulerDefList) {
if (timestamp >= schedulerDef.getTriggerTimestamp()) {
var minTimestamp = Long.MAX_VALUE;
for (var scheduler : schedulerDefList) {
if (timestamp >= scheduler.getTriggerTimestamp()) {
// 到达触发时间,则执行runnable方法
schedulerDef.getScheduler().invoke();
scheduler.getScheduler().invoke();
// 重新设置下一次的触发时间戳
var nextTriggerTimestamp = TimeUtils.getNextTimestampByCronExpression(schedulerDef.getCronExpression(), timestamp);
schedulerDef.setTriggerTimestamp(nextTriggerTimestamp);
var nextTriggerTimestamp = TimeUtils.getNextTimestampByCronExpression(scheduler.getCronExpression(), timestamp);
scheduler.setTriggerTimestamp(nextTriggerTimestamp);
}
if (scheduler.getTriggerTimestamp() < minTimestamp) {
minTimestamp = scheduler.getTriggerTimestamp();
}
}
minSchedulerTriggerTimestamp = minSchedulerTriggerTimestamp();
minTriggerTimestamp = minTimestamp;
}
public static void registerScheduler(SchedulerDefinition scheduler) {
schedulerDefList.add(scheduler);
minSchedulerTriggerTimestamp = minSchedulerTriggerTimestamp();
refreshMinTriggerTimestamp();
}
@@ -41,12 +41,6 @@ public class SchedulerDefinitionParser implements BeanDefinitionParser {
builder = BeanDefinitionBuilder.rootBeanDefinition(clazz);
parserContext.getRegistry().registerBeanDefinition(name, builder.getBeanDefinition());
// 注册SchedulerRegisterProcessor
clazz = SchedulerRegisterProcessor.class;
name = StringUtils.uncapitalize(clazz.getName());
builder = BeanDefinitionBuilder.rootBeanDefinition(clazz);
parserContext.getRegistry().registerBeanDefinition(name, builder.getBeanDefinition());
return builder.getBeanDefinition();
}
@@ -1,83 +0,0 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.scheduler.schema;
import com.zfoo.protocol.collection.ArrayUtils;
import com.zfoo.protocol.util.ReflectionUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.scheduler.manager.SchedulerBus;
import com.zfoo.scheduler.model.anno.Scheduler;
import com.zfoo.scheduler.model.vo.SchedulerDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import java.lang.reflect.Modifier;
/**
* @author jaysunxiao
* @version 3.0
*/
public class SchedulerRegisterProcessor implements BeanPostProcessor {
private static final Logger logger = LoggerFactory.getLogger(SchedulerRegisterProcessor.class);
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
var clazz = bean.getClass();
var methods = ReflectionUtils.getMethodsByAnnoInPOJOClass(bean.getClass(), Scheduler.class);
if (ArrayUtils.isEmpty(methods)) {
return bean;
}
if (!ReflectionUtils.isPojoClass(clazz)) {
logger.warn("调度注册类[{}]不是POJO类,父类的调度不会被扫描到", clazz);
}
try {
for (var method : methods) {
var schedulerMethod = method.getAnnotation(Scheduler.class);
var paramClazzs = method.getParameterTypes();
if (paramClazzs.length >= 1) {
throw new IllegalArgumentException(StringUtils.format("[class:{}] [method:{}] can not have any parameters", bean.getClass(), method.getName()));
}
var methodName = method.getName();
if (!Modifier.isPublic(method.getModifiers())) {
throw new IllegalArgumentException(StringUtils.format("[class:{}] [method:{}] must use 'public' as modifier!", bean.getClass().getName(), methodName));
}
if (Modifier.isStatic(method.getModifiers())) {
throw new IllegalArgumentException(StringUtils.format("[class:{}] [method:{}] can not use 'static' as modifier!", bean.getClass().getName(), methodName));
}
if (!methodName.startsWith("cron")) {
throw new IllegalArgumentException(StringUtils.format("[class:{}] [method:{}] must start with 'cron' as method name!"
, bean.getClass().getName(), methodName));
}
var scheduler = SchedulerDefinition.valueOf(schedulerMethod.cron(), bean, method);
SchedulerBus.registerScheduler(scheduler);
}
} catch (Throwable t) {
throw new RuntimeException(t);
}
return bean;
}
}
@@ -24,7 +24,6 @@ import java.time.temporal.TemporalAdjusters;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
/**
* @author jaysunxiao
@@ -81,12 +80,9 @@ public abstract class TimeUtils {
};
static {
SchedulerBus.schedule(new Runnable() {
@Override
public void run() {
currentTimeMillis();
}
}, 0, TimeUnit.SECONDS);
currentTimeMillis();
// 调用一下静态方法,使SchedulerBus静态代码块初始化
SchedulerBus.refreshMinTriggerTimestamp();
}
/**