perf[net]: 在task,event,scheduler线程执行的异步请求,请求成功过后依然在相同的线程执行回调任务

This commit is contained in:
godotg
2022-07-28 15:35:44 +08:00
parent 412592110b
commit 004d96a3dd
21 changed files with 202 additions and 230 deletions
@@ -16,7 +16,11 @@ package com.zfoo.event.manager;
import com.zfoo.event.model.event.IEvent;
import com.zfoo.event.model.vo.IEventReceiver;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.util.AssertionUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.SafeRunnable;
import com.zfoo.util.math.RandomUtils;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,12 +28,11 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public abstract class EventBus {
@@ -43,12 +46,43 @@ public abstract class EventBus {
private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE];
private static final Map<Long, ExecutorService> threadMap = new ConcurrentHashMap<>();
private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMap = new HashMap<>();
static {
for (int i = 0; i < executors.length; i++) {
var namedThreadFactory = new EventThreadFactory(i + 1);
executors[i] = Executors.newSingleThreadExecutor(namedThreadFactory);
var executor = Executors.newSingleThreadExecutor(namedThreadFactory);
namedThreadFactory.executor = executor;
executors[i] = executor;
}
}
public static class EventThreadFactory implements ThreadFactory {
public ExecutorService executor;
private int poolNumber;
private AtomicInteger threadNumber = new AtomicInteger(1);
private ThreadGroup group;
public EventThreadFactory(int poolNumber) {
var s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.poolNumber = poolNumber;
}
@Override
public Thread newThread(Runnable runnable) {
var threadName = StringUtils.format("event-p{}-t{}", poolNumber, threadNumber.getAndIncrement());
var thread = new FastThreadLocalThread(group, runnable, threadName, 0);
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
AssertionUtils.notNull(executor);
threadMap.put(thread.getId(), executor);
return thread;
}
}
@@ -77,19 +111,11 @@ public abstract class EventBus {
return;
}
executors[Math.abs(event.threadId() % EXECUTORS_SIZE)].execute(new Runnable() {
@Override
public void run() {
doSubmit(event, list);
}
});
executors[Math.abs(event.threadId() % EXECUTORS_SIZE)].execute(() -> doSubmit(event, list));
}
/**
* 随机获取一个线程
*/
public static Executor asyncExecute() {
return executors[RandomUtils.randomInt(EXECUTORS_SIZE)];
public static void asyncExecute(SafeRunnable runnable) {
execute(RandomUtils.randomInt(EXECUTORS_SIZE), runnable);
}
/**
@@ -98,8 +124,8 @@ public abstract class EventBus {
* @param hashcode
* @return
*/
public static Executor execute(int hashcode) {
return executors[Math.abs(hashcode % EXECUTORS_SIZE)];
public static void execute(int hashcode, SafeRunnable runnable) {
executors[Math.abs(hashcode % EXECUTORS_SIZE)].execute(runnable);
}
/**
@@ -123,6 +149,7 @@ public abstract class EventBus {
/**
* 注册事件及其对应观察者
*
*
* @param eventType
* @param receiver
*/
@@ -130,6 +157,9 @@ public abstract class EventBus {
receiverMap.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver);
}
public static Executor threadExecutor(long currentThreadId) {
return threadMap.get(currentThreadId);
}
}
@@ -1,50 +0,0 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.event.manager;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @version 3.0
*/
public class EventThreadFactory implements ThreadFactory {
private static final Logger logger = LoggerFactory.getLogger(EventThreadFactory.class);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public EventThreadFactory(int poolNumber) {
var s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "event-p" + poolNumber + "-t";
}
@Override
public Thread newThread(Runnable runnable) {
var t = new FastThreadLocalThread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setUncaughtExceptionHandler((thread, e) -> logger.error(thread.toString(), e));
return t;
}
}
@@ -29,6 +29,7 @@ import com.zfoo.protocol.util.IOUtils;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.scheduler.manager.SchedulerBus;
import com.zfoo.util.SafeRunnable;
import com.zfoo.util.ThreadUtils;
import com.zfoo.util.net.HostAndPort;
import io.netty.util.concurrent.FastThreadLocalThread;
@@ -66,7 +67,7 @@ import java.util.stream.Collectors;
/**
* 服务注册,服务发现
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class ZookeeperRegistry implements IRegistry {
@@ -360,9 +361,9 @@ public class ZookeeperRegistry implements IRegistry {
} catch (Exception e) {
//
logger.error("zookeeper初始化失败,等待[{}]秒,重新初始化", RETRY_SECONDS, e);
SchedulerBus.schedule(new Runnable() {
SchedulerBus.schedule(new SafeRunnable() {
@Override
public void run() {
public void doRun() {
initZookeeper();
}
}, RETRY_SECONDS, TimeUnit.SECONDS);
@@ -521,7 +522,12 @@ public class ZookeeperRegistry implements IRegistry {
}
if (recheckFlag) {
SchedulerBus.schedule(() -> checkConsumer(), RETRY_SECONDS, TimeUnit.SECONDS);
SchedulerBus.schedule(new SafeRunnable() {
@Override
public void doRun() {
checkConsumer();
}
}, RETRY_SECONDS, TimeUnit.SECONDS);
}
}
@@ -14,15 +14,15 @@
package com.zfoo.net.router.answer;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.task.model.SafeRunnable;
import com.zfoo.protocol.IPacket;
import com.zfoo.util.SafeRunnable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class AsyncAnswer<T extends IPacket> implements IAsyncAnswer<T> {
@@ -13,13 +13,13 @@
package com.zfoo.net.router.answer;
import com.zfoo.net.task.model.SafeRunnable;
import com.zfoo.protocol.IPacket;
import com.zfoo.util.SafeRunnable;
import java.util.function.Consumer;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public interface IAsyncAnswer<T extends IPacket> {
@@ -13,27 +13,26 @@
package com.zfoo.net.task;
import com.zfoo.event.manager.EventBus;
import com.zfoo.net.NetContext;
import com.zfoo.net.task.dispatcher.AbstractTaskDispatch;
import com.zfoo.net.task.dispatcher.ITaskDispatch;
import com.zfoo.net.task.model.PacketReceiverTask;
import com.zfoo.protocol.util.AssertionUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.math.HashUtils;
import com.zfoo.scheduler.manager.SchedulerBus;
import com.zfoo.util.SafeRunnable;
import com.zfoo.util.math.RandomUtils;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public final class TaskBus {
@@ -76,9 +75,9 @@ public final class TaskBus {
public ExecutorService executor;
private final int poolNumber;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
private int poolNumber;
private AtomicInteger threadNumber = new AtomicInteger(1);
private ThreadGroup group;
public TaskThreadFactory(int poolNumber) {
var s = System.getSecurityManager();
@@ -89,14 +88,13 @@ public final class TaskBus {
@Override
public Thread newThread(Runnable runnable) {
var threadName = StringUtils.format("task-p{}-t{}", poolNumber, threadNumber.getAndIncrement());
var t = new FastThreadLocalThread(group, runnable, threadName, 0);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setUncaughtExceptionHandler((thread, e) -> logger.error(thread.toString(), e));
var thread = new FastThreadLocalThread(group, runnable, threadName, 0);
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
AssertionUtils.notNull(executor);
var threadId = t.getId();
threadMap.put(threadId, executor);
return t;
threadMap.put(thread.getId(), executor);
return thread;
}
}
@@ -120,22 +118,36 @@ public final class TaskBus {
*/
public static void submit(PacketReceiverTask task) {
// 里面会看到是:其中一致性hash是根据附加包记录的hashId进行选择哪个线程进行业务处理
taskDispatch.getExecutor(task).execute(task);
taskDispatch.getExecutor(executors, task).execute(task);
}
public static ExecutorService executor(Object hashObj) {
return executors[Math.abs(HashUtils.fnvHash(hashObj.hashCode()) % EXECUTOR_SIZE)];
public static int executorIndex(int executorConsistentHash) {
return Math.abs(executorConsistentHash % EXECUTOR_SIZE);
}
// 在task线程的异步请求,请求成功过后依然在相同的task线程执行回调任务
public static ExecutorService currentThreadExecutor() {
public static void executor(int executorConsistentHash, SafeRunnable runnable) {
executors[executorIndex(executorConsistentHash)].execute(runnable);
}
// 在taskeventscheduler线程执行的异步请求,请求成功过后依然在相同的线程执行回调任务
public static Executor currentThreadExecutor() {
var threadId = Thread.currentThread().getId();
var executor = threadMap.get(threadId);
if (executor == null) {
logger.error("threadId:[{}]找不到对应的executor", threadId);
return executor(RandomUtils.randomInt());
var taskExecutor = threadMap.get(threadId);
if (taskExecutor != null) {
return taskExecutor;
}
return executor;
var eventExecutor = EventBus.threadExecutor(threadId);
if (eventExecutor != null) {
return eventExecutor;
}
var schedulerExecutor = SchedulerBus.threadExecutor(threadId);
if (schedulerExecutor != null) {
return schedulerExecutor;
}
return executors[executorIndex(RandomUtils.randomInt())];
}
}
@@ -16,11 +16,13 @@ package com.zfoo.net.task.dispatcher;
import com.zfoo.net.session.model.AttributeType;
import com.zfoo.net.task.TaskBus;
import com.zfoo.net.task.model.PacketReceiverTask;
import com.zfoo.util.math.HashUtils;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class ConsistentHashTaskDispatch extends AbstractTaskDispatch {
@@ -32,22 +34,22 @@ public class ConsistentHashTaskDispatch extends AbstractTaskDispatch {
}
@Override
public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) {
public Executor getExecutor(ExecutorService[] executors, PacketReceiverTask packetReceiverTask) {
var attachment = packetReceiverTask.getAttachment();
if (attachment == null) {
var session = packetReceiverTask.getSession();
Long uid = session.getAttribute(AttributeType.UID);
var uid = session.getAttribute(AttributeType.UID);
if (uid == null) {
return SessionIdTaskDispatch.getInstance().getExecutor(packetReceiverTask);
return SessionIdTaskDispatch.getInstance().getExecutor(executors, packetReceiverTask);
} else {
return TaskBus.executor(uid);
return executors[TaskBus.executorIndex(HashUtils.fnvHash(uid))];
}
}
// 可见最终是根据附加包的信息选择服务端由哪个线程执行这个业务
return TaskBus.executor(attachment.executorConsistentHash());
return executors[TaskBus.executorIndex(attachment.executorConsistentHash())];
}
}
@@ -15,14 +15,15 @@ package com.zfoo.net.task.dispatcher;
import com.zfoo.net.task.model.PacketReceiverTask;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public interface ITaskDispatch {
ExecutorService getExecutor(PacketReceiverTask packetReceiverTask);
Executor getExecutor(ExecutorService[] executors, PacketReceiverTask packetReceiverTask);
}
@@ -15,11 +15,13 @@ package com.zfoo.net.task.dispatcher;
import com.zfoo.net.task.TaskBus;
import com.zfoo.net.task.model.PacketReceiverTask;
import com.zfoo.util.math.RandomUtils;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class RandomTaskDispatch extends AbstractTaskDispatch {
@@ -31,8 +33,8 @@ public class RandomTaskDispatch extends AbstractTaskDispatch {
}
@Override
public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) {
return TaskBus.executor(-1);
public Executor getExecutor(ExecutorService[] executors, PacketReceiverTask packetReceiverTask) {
return executors[TaskBus.executorIndex(RandomUtils.randomInt())];
}
}
@@ -17,12 +17,13 @@ import com.zfoo.net.task.TaskBus;
import com.zfoo.net.task.model.PacketReceiverTask;
import com.zfoo.util.math.HashUtils;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* 同一个session总是分配到同一个线程池执行
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class SessionIdTaskDispatch extends AbstractTaskDispatch {
@@ -34,9 +35,9 @@ public class SessionIdTaskDispatch extends AbstractTaskDispatch {
}
@Override
public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) {
public Executor getExecutor(ExecutorService[] executors, PacketReceiverTask packetReceiverTask) {
var session = packetReceiverTask.getSession();
return TaskBus.executor(session.getSid());
return executors[TaskBus.executorIndex(HashUtils.fnvHash(session.getSid()))];
}
}
@@ -20,6 +20,7 @@ import com.zfoo.event.manager.EventBus;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.model.Pair;
import com.zfoo.scheduler.manager.SchedulerBus;
import com.zfoo.util.SafeRunnable;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -38,7 +39,7 @@ import java.util.function.Function;
* <p>
* 批量查找通过batchLoadCallback方法查找,当查找的key不存在的时候,调用defaultValueBuilder生成一个默认值放入缓存。
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class SimpleCache<K, V> {
@@ -93,13 +94,13 @@ public class SimpleCache<K, V> {
});
SchedulerBus.scheduleAtFixedRate(new Runnable() {
SchedulerBus.scheduleAtFixedRate(new SafeRunnable() {
@Override
public void run() {
public void doRun() {
// 不在任务调度线程中执行耗时任务,因为任务调度线程只有一个线程池
EventBus.asyncExecute().execute(new Runnable() {
EventBus.asyncExecute(new SafeRunnable() {
@Override
public void run() {
public void doRun() {
var list = new ArrayList<K>();
while (!linkedQueue.isEmpty()) {
var key = linkedQueue.poll();
@@ -15,6 +15,7 @@ package com.zfoo.net.util;
import com.zfoo.event.manager.EventBus;
import com.zfoo.scheduler.util.TimeUtils;
import com.zfoo.util.SafeRunnable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -23,7 +24,7 @@ import java.util.function.Supplier;
/**
* 单值缓存,会隔一段时间在后台刷新一下缓存
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class SingleCache<V> {
@@ -61,9 +62,9 @@ public class SingleCache<V> {
try {
if (now > refreshTime) {
refreshTime = now + refreshDuration;
EventBus.asyncExecute().execute(new Runnable() {
EventBus.asyncExecute(new SafeRunnable() {
@Override
public void run() {
public void doRun() {
cache = supplier.get();
}
});
@@ -15,6 +15,7 @@ package com.zfoo.net.router;
import com.zfoo.event.manager.EventBus;
import com.zfoo.net.router.route.SignalBridge;
import com.zfoo.scheduler.util.TimeUtils;
import com.zfoo.util.SafeRunnable;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
@@ -24,7 +25,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
@Ignore
@@ -54,9 +55,9 @@ public class SignalBridgeTest {
var countDownLatch = new CountDownLatch(executorSize);
for (var i = 0; i < executorSize; i++) {
EventBus.execute(i).execute(new Runnable() {
EventBus.execute(i, new SafeRunnable() {
@Override
public void run() {
public void doRun() {
addAndRemoveArray();
countDownLatch.countDown();
}
@@ -72,9 +73,9 @@ public class SignalBridgeTest {
var countDownLatch = new CountDownLatch(executorSize);
for (int i = 0; i < executorSize; i++) {
EventBus.execute(i).execute(new Runnable() {
EventBus.execute(i, new SafeRunnable() {
@Override
public void run() {
public void doRun() {
addAndRemoveMap();
countDownLatch.countDown();
}
@@ -20,6 +20,7 @@ import com.zfoo.orm.model.vo.EntityDef;
import com.zfoo.protocol.exception.ExceptionUtils;
import com.zfoo.scheduler.manager.SchedulerBus;
import com.zfoo.scheduler.util.TimeUtils;
import com.zfoo.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.support.CronExpression;
@@ -27,7 +28,7 @@ import org.springframework.scheduling.support.CronExpression;
import java.util.concurrent.TimeUnit;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class CronOrmPersister extends AbstractOrmPersister {
@@ -74,13 +75,13 @@ public class CronOrmPersister extends AbstractOrmPersister {
}
if (!OrmContext.isStop()) {
SchedulerBus.schedule(new Runnable() {
SchedulerBus.schedule(new SafeRunnable() {
@Override
public void run() {
public void doRun() {
if (!OrmContext.isStop()) {
EventBus.execute(entityDef.getClazz().hashCode()).execute(new Runnable() {
EventBus.execute(entityDef.getClazz().hashCode(), new SafeRunnable() {
@Override
public void run() {
public void doRun() {
entityCaches.persistAll();
schedulePersist();
}
@@ -19,11 +19,12 @@ import com.zfoo.orm.model.cache.EntityCaches;
import com.zfoo.orm.model.vo.EntityDef;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.scheduler.manager.SchedulerBus;
import com.zfoo.util.SafeRunnable;
import java.util.concurrent.TimeUnit;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class TimeOrmPersister extends AbstractOrmPersister {
@@ -43,13 +44,13 @@ public class TimeOrmPersister extends AbstractOrmPersister {
@Override
public void start() {
SchedulerBus.scheduleAtFixedRate(new Runnable() {
SchedulerBus.scheduleAtFixedRate(new SafeRunnable() {
@Override
public void run() {
public void doRun() {
if (!OrmContext.isStop()) {
EventBus.execute(entityDef.getClazz().hashCode()).execute(new Runnable() {
EventBus.execute(entityDef.getClazz().hashCode(), new SafeRunnable() {
@Override
public void run() {
public void doRun() {
entityCaches.persistAll();
}
});
@@ -15,6 +15,7 @@ package com.zfoo.orm.lpmap;
import com.zfoo.event.manager.EventBus;
import com.zfoo.orm.lpmap.model.MyPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.util.SafeRunnable;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -25,7 +26,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
@Ignore
@@ -41,9 +42,9 @@ public class ConcurrentFileChannelMapTest {
var countdown = new CountDownLatch(EventBus.EXECUTORS_SIZE);
for (int i = 0; i < EventBus.EXECUTORS_SIZE; i++) {
EventBus.asyncExecute().execute(new Runnable() {
EventBus.asyncExecute(new SafeRunnable() {
@Override
public void run() {
public void doRun() {
var key = atomicInt.getAndIncrement();
while (key < count) {
var myPacket = MyPacket.valueOf(key, String.valueOf(key));
@@ -15,6 +15,7 @@ package com.zfoo.orm.lpmap;
import com.zfoo.event.manager.EventBus;
import com.zfoo.orm.lpmap.model.MyPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.util.SafeRunnable;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -25,7 +26,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
@Ignore
@@ -58,9 +59,9 @@ public class ConcurrentHeapMapTest {
var countdown = new CountDownLatch(EventBus.EXECUTORS_SIZE);
for (int i = 0; i < EventBus.EXECUTORS_SIZE; i++) {
EventBus.asyncExecute().execute(new Runnable() {
EventBus.asyncExecute(new SafeRunnable() {
@Override
public void run() {
public void doRun() {
var key = atomicInt.getAndIncrement();
while (key < count) {
var myPacket = MyPacket.valueOf(key, String.valueOf(key));
@@ -14,22 +14,23 @@
package com.zfoo.scheduler.manager;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.scheduler.SchedulerContext;
import com.zfoo.scheduler.model.vo.SchedulerDefinition;
import com.zfoo.scheduler.util.TimeUtils;
import com.zfoo.util.SafeRunnable;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public abstract class SchedulerBus {
@@ -41,6 +42,9 @@ public abstract class SchedulerBus {
* scheduler默认只有一个单线程的线程池
*/
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1));
private static long threadId = 0;
/**
* 上一次trigger触发时间
*/
@@ -63,6 +67,31 @@ public abstract class SchedulerBus {
}, 0, TRIGGER_MILLIS_INTERVAL, TimeUnit.MILLISECONDS);
}
public static class SchedulerThreadFactory implements ThreadFactory {
private int poolNumber;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
public SchedulerThreadFactory(int poolNumber) {
var s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.poolNumber = poolNumber;
}
@Override
public Thread newThread(Runnable runnable) {
var threadName = StringUtils.format("scheduler-p{}-t{}", poolNumber, threadNumber.getAndIncrement());
var thread = new FastThreadLocalThread(group, runnable, threadName, 0);
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
threadId = thread.getId();
return thread;
}
}
public static void refreshMinTriggerTimestamp() {
var minTimestamp = Long.MAX_VALUE;
@@ -134,46 +163,24 @@ public abstract class SchedulerBus {
/**
* 不断执行的周期循环任务
*/
public static void scheduleAtFixedRate(Runnable runnable, long period, TimeUnit unit) {
public static void scheduleAtFixedRate(SafeRunnable runnable, long period, TimeUnit unit) {
if (SchedulerContext.isStop()) {
return;
}
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
logger.error("scheduleAtFixedRate未知exception异常", e);
} catch (Throwable t) {
logger.error("scheduleAtFixedRate未知error异常", t);
}
}
}, 0, period, unit);
executor.scheduleAtFixedRate(runnable, 0, period, unit);
}
/**
* 固定延迟执行的任务
*/
public static void schedule(Runnable runnable, long delay, TimeUnit unit) {
public static void schedule(SafeRunnable runnable, long delay, TimeUnit unit) {
if (SchedulerContext.isStop()) {
return;
}
executor.schedule(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
logger.error("schedule未知exception异常", e);
} catch (Throwable t) {
logger.error("schedule未知error异常", t);
}
}
}, delay, unit);
executor.schedule(runnable, delay, unit);
}
/**
@@ -186,4 +193,8 @@ public abstract class SchedulerBus {
schedulerDefList.add(SchedulerDefinition.valueOf(cron, runnable));
}
public static Executor threadExecutor(long currentThreadId) {
return threadId == currentThreadId ? executor : null;
}
}
@@ -1,50 +0,0 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.scheduler.manager;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @version 3.0
*/
public class SchedulerThreadFactory implements ThreadFactory {
private static final Logger logger = LoggerFactory.getLogger(SchedulerThreadFactory.class);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public SchedulerThreadFactory(int poolNumber) {
var s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "scheduler-p" + poolNumber + "-t";
}
@Override
public Thread newThread(Runnable runnable) {
var t = new FastThreadLocalThread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setUncaughtExceptionHandler((thread, e) -> logger.error(thread.toString(), e));
return t;
}
}
@@ -1,10 +1,10 @@
package com.zfoo.net.task.model;
package com.zfoo.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public abstract class SafeRunnable implements Runnable {
@@ -23,7 +23,7 @@ import java.util.TreeMap;
/**
* 带虚拟节点的一致性Hash算法,参考:http://www.zsythink.net/archives/1182
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
@@ -35,7 +35,7 @@ public class ConsistentHash<K, V> {
// 虚拟节点,key表示虚拟节点的hash值,value表示虚拟节点的名称
private TreeMap<Integer, Pair<K, V>> virtualNodeTreeMap = new TreeMap<>();
// 虚拟节点的数目,数量越大约均匀,经验值150
// 虚拟节点的数目,数量越大约均匀
private int virtualNodes = 0;
public ConsistentHash(List<Pair<K, V>> realNodes, int virtualNodes) {