From 004d96a3dde8e92d7c8ffef0c0d3f188bd6f30c7 Mon Sep 17 00:00:00 2001 From: godotg Date: Thu, 28 Jul 2022 15:35:44 +0800 Subject: [PATCH] =?UTF-8?q?perf[net]:=20=E5=9C=A8task=EF=BC=8Cevent?= =?UTF-8?q?=EF=BC=8Cscheduler=E7=BA=BF=E7=A8=8B=E6=89=A7=E8=A1=8C=E7=9A=84?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E8=AF=B7=E6=B1=82=EF=BC=8C=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E6=88=90=E5=8A=9F=E8=BF=87=E5=90=8E=E4=BE=9D=E7=84=B6=E5=9C=A8?= =?UTF-8?q?=E7=9B=B8=E5=90=8C=E7=9A=84=E7=BA=BF=E7=A8=8B=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=9B=9E=E8=B0=83=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zfoo/event/manager/EventBus.java | 66 ++++++++++++----- .../event/manager/EventThreadFactory.java | 50 ------------- .../consumer/registry/ZookeeperRegistry.java | 14 +++- .../zfoo/net/router/answer/AsyncAnswer.java | 4 +- .../zfoo/net/router/answer/IAsyncAnswer.java | 4 +- .../main/java/com/zfoo/net/task/TaskBus.java | 64 +++++++++------- .../ConsistentHashTaskDispatch.java | 14 ++-- .../net/task/dispatcher/ITaskDispatch.java | 5 +- .../task/dispatcher/RandomTaskDispatch.java | 8 +- .../dispatcher/SessionIdTaskDispatch.java | 7 +- .../java/com/zfoo/net/util/SimpleCache.java | 11 +-- .../java/com/zfoo/net/util/SingleCache.java | 7 +- .../com/zfoo/net/router/SignalBridgeTest.java | 11 +-- .../orm/model/persister/CronOrmPersister.java | 11 +-- .../orm/model/persister/TimeOrmPersister.java | 11 +-- .../lpmap/ConcurrentFileChannelMapTest.java | 7 +- .../zfoo/orm/lpmap/ConcurrentHeapMapTest.java | 7 +- .../zfoo/scheduler/manager/SchedulerBus.java | 73 +++++++++++-------- .../manager/SchedulerThreadFactory.java | 50 ------------- .../java/com/zfoo/util}/SafeRunnable.java | 4 +- .../com/zfoo/util/math/ConsistentHash.java | 4 +- 21 files changed, 202 insertions(+), 230 deletions(-) delete mode 100644 event/src/main/java/com/zfoo/event/manager/EventThreadFactory.java delete mode 100644 scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerThreadFactory.java rename {net/src/main/java/com/zfoo/net/task/model => util/src/main/java/com/zfoo/util}/SafeRunnable.java (90%) diff --git a/event/src/main/java/com/zfoo/event/manager/EventBus.java b/event/src/main/java/com/zfoo/event/manager/EventBus.java index fd11a3ee..7eb41cfc 100644 --- a/event/src/main/java/com/zfoo/event/manager/EventBus.java +++ b/event/src/main/java/com/zfoo/event/manager/EventBus.java @@ -16,7 +16,11 @@ package com.zfoo.event.manager; import com.zfoo.event.model.event.IEvent; import com.zfoo.event.model.vo.IEventReceiver; import com.zfoo.protocol.collection.CollectionUtils; +import com.zfoo.protocol.util.AssertionUtils; +import com.zfoo.protocol.util.StringUtils; +import com.zfoo.util.SafeRunnable; import com.zfoo.util.math.RandomUtils; +import io.netty.util.concurrent.FastThreadLocalThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,12 +28,11 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public abstract class EventBus { @@ -43,12 +46,43 @@ public abstract class EventBus { private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE]; + private static final Map threadMap = new ConcurrentHashMap<>(); + private static final Map, List> receiverMap = new HashMap<>(); static { for (int i = 0; i < executors.length; i++) { var namedThreadFactory = new EventThreadFactory(i + 1); - executors[i] = Executors.newSingleThreadExecutor(namedThreadFactory); + var executor = Executors.newSingleThreadExecutor(namedThreadFactory); + namedThreadFactory.executor = executor; + executors[i] = executor; + } + } + + public static class EventThreadFactory implements ThreadFactory { + + public ExecutorService executor; + + private int poolNumber; + private AtomicInteger threadNumber = new AtomicInteger(1); + private ThreadGroup group; + + public EventThreadFactory(int poolNumber) { + var s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + this.poolNumber = poolNumber; + } + + @Override + public Thread newThread(Runnable runnable) { + var threadName = StringUtils.format("event-p{}-t{}", poolNumber, threadNumber.getAndIncrement()); + var thread = new FastThreadLocalThread(group, runnable, threadName, 0); + thread.setDaemon(false); + thread.setPriority(Thread.NORM_PRIORITY); + thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e)); + AssertionUtils.notNull(executor); + threadMap.put(thread.getId(), executor); + return thread; } } @@ -77,19 +111,11 @@ public abstract class EventBus { return; } - executors[Math.abs(event.threadId() % EXECUTORS_SIZE)].execute(new Runnable() { - @Override - public void run() { - doSubmit(event, list); - } - }); + executors[Math.abs(event.threadId() % EXECUTORS_SIZE)].execute(() -> doSubmit(event, list)); } - /** - * 随机获取一个线程 - */ - public static Executor asyncExecute() { - return executors[RandomUtils.randomInt(EXECUTORS_SIZE)]; + public static void asyncExecute(SafeRunnable runnable) { + execute(RandomUtils.randomInt(EXECUTORS_SIZE), runnable); } /** @@ -98,8 +124,8 @@ public abstract class EventBus { * @param hashcode * @return */ - public static Executor execute(int hashcode) { - return executors[Math.abs(hashcode % EXECUTORS_SIZE)]; + public static void execute(int hashcode, SafeRunnable runnable) { + executors[Math.abs(hashcode % EXECUTORS_SIZE)].execute(runnable); } /** @@ -123,6 +149,7 @@ public abstract class EventBus { /** * 注册事件及其对应观察者 * + * * @param eventType * @param receiver */ @@ -130,6 +157,9 @@ public abstract class EventBus { receiverMap.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver); } + public static Executor threadExecutor(long currentThreadId) { + return threadMap.get(currentThreadId); + } } diff --git a/event/src/main/java/com/zfoo/event/manager/EventThreadFactory.java b/event/src/main/java/com/zfoo/event/manager/EventThreadFactory.java deleted file mode 100644 index 9965e293..00000000 --- a/event/src/main/java/com/zfoo/event/manager/EventThreadFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (C) 2020 The zfoo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.zfoo.event.manager; - -import io.netty.util.concurrent.FastThreadLocalThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author jaysunxiao - * @version 3.0 - */ -public class EventThreadFactory implements ThreadFactory { - - private static final Logger logger = LoggerFactory.getLogger(EventThreadFactory.class); - - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - - public EventThreadFactory(int poolNumber) { - var s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - namePrefix = "event-p" + poolNumber + "-t"; - } - - @Override - public Thread newThread(Runnable runnable) { - var t = new FastThreadLocalThread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0); - t.setDaemon(false); - t.setPriority(Thread.NORM_PRIORITY); - t.setUncaughtExceptionHandler((thread, e) -> logger.error(thread.toString(), e)); - return t; - } - -} diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java b/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java index 7983b94d..9a8341a6 100644 --- a/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java +++ b/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java @@ -29,6 +29,7 @@ import com.zfoo.protocol.util.IOUtils; import com.zfoo.protocol.util.JsonUtils; import com.zfoo.protocol.util.StringUtils; import com.zfoo.scheduler.manager.SchedulerBus; +import com.zfoo.util.SafeRunnable; import com.zfoo.util.ThreadUtils; import com.zfoo.util.net.HostAndPort; import io.netty.util.concurrent.FastThreadLocalThread; @@ -66,7 +67,7 @@ import java.util.stream.Collectors; /** * 服务注册,服务发现 * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class ZookeeperRegistry implements IRegistry { @@ -360,9 +361,9 @@ public class ZookeeperRegistry implements IRegistry { } catch (Exception e) { // logger.error("zookeeper初始化失败,等待[{}]秒,重新初始化", RETRY_SECONDS, e); - SchedulerBus.schedule(new Runnable() { + SchedulerBus.schedule(new SafeRunnable() { @Override - public void run() { + public void doRun() { initZookeeper(); } }, RETRY_SECONDS, TimeUnit.SECONDS); @@ -521,7 +522,12 @@ public class ZookeeperRegistry implements IRegistry { } if (recheckFlag) { - SchedulerBus.schedule(() -> checkConsumer(), RETRY_SECONDS, TimeUnit.SECONDS); + SchedulerBus.schedule(new SafeRunnable() { + @Override + public void doRun() { + checkConsumer(); + } + }, RETRY_SECONDS, TimeUnit.SECONDS); } } diff --git a/net/src/main/java/com/zfoo/net/router/answer/AsyncAnswer.java b/net/src/main/java/com/zfoo/net/router/answer/AsyncAnswer.java index f638ca6c..7b935ceb 100644 --- a/net/src/main/java/com/zfoo/net/router/answer/AsyncAnswer.java +++ b/net/src/main/java/com/zfoo/net/router/answer/AsyncAnswer.java @@ -14,15 +14,15 @@ package com.zfoo.net.router.answer; import com.zfoo.net.router.attachment.SignalAttachment; -import com.zfoo.net.task.model.SafeRunnable; import com.zfoo.protocol.IPacket; +import com.zfoo.util.SafeRunnable; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class AsyncAnswer implements IAsyncAnswer { diff --git a/net/src/main/java/com/zfoo/net/router/answer/IAsyncAnswer.java b/net/src/main/java/com/zfoo/net/router/answer/IAsyncAnswer.java index 705dc7dd..22ccc830 100644 --- a/net/src/main/java/com/zfoo/net/router/answer/IAsyncAnswer.java +++ b/net/src/main/java/com/zfoo/net/router/answer/IAsyncAnswer.java @@ -13,13 +13,13 @@ package com.zfoo.net.router.answer; -import com.zfoo.net.task.model.SafeRunnable; import com.zfoo.protocol.IPacket; +import com.zfoo.util.SafeRunnable; import java.util.function.Consumer; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public interface IAsyncAnswer { diff --git a/net/src/main/java/com/zfoo/net/task/TaskBus.java b/net/src/main/java/com/zfoo/net/task/TaskBus.java index 84759249..51246018 100644 --- a/net/src/main/java/com/zfoo/net/task/TaskBus.java +++ b/net/src/main/java/com/zfoo/net/task/TaskBus.java @@ -13,27 +13,26 @@ package com.zfoo.net.task; +import com.zfoo.event.manager.EventBus; import com.zfoo.net.NetContext; import com.zfoo.net.task.dispatcher.AbstractTaskDispatch; import com.zfoo.net.task.dispatcher.ITaskDispatch; import com.zfoo.net.task.model.PacketReceiverTask; import com.zfoo.protocol.util.AssertionUtils; import com.zfoo.protocol.util.StringUtils; -import com.zfoo.util.math.HashUtils; +import com.zfoo.scheduler.manager.SchedulerBus; +import com.zfoo.util.SafeRunnable; import com.zfoo.util.math.RandomUtils; import io.netty.util.concurrent.FastThreadLocalThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public final class TaskBus { @@ -76,9 +75,9 @@ public final class TaskBus { public ExecutorService executor; - private final int poolNumber; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final ThreadGroup group; + private int poolNumber; + private AtomicInteger threadNumber = new AtomicInteger(1); + private ThreadGroup group; public TaskThreadFactory(int poolNumber) { var s = System.getSecurityManager(); @@ -89,14 +88,13 @@ public final class TaskBus { @Override public Thread newThread(Runnable runnable) { var threadName = StringUtils.format("task-p{}-t{}", poolNumber, threadNumber.getAndIncrement()); - var t = new FastThreadLocalThread(group, runnable, threadName, 0); - t.setDaemon(false); - t.setPriority(Thread.NORM_PRIORITY); - t.setUncaughtExceptionHandler((thread, e) -> logger.error(thread.toString(), e)); + var thread = new FastThreadLocalThread(group, runnable, threadName, 0); + thread.setDaemon(false); + thread.setPriority(Thread.NORM_PRIORITY); + thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e)); AssertionUtils.notNull(executor); - var threadId = t.getId(); - threadMap.put(threadId, executor); - return t; + threadMap.put(thread.getId(), executor); + return thread; } } @@ -120,22 +118,36 @@ public final class TaskBus { */ public static void submit(PacketReceiverTask task) { // 里面会看到是:其中一致性hash是根据附加包记录的hashId进行选择哪个线程进行业务处理 - taskDispatch.getExecutor(task).execute(task); + taskDispatch.getExecutor(executors, task).execute(task); } - public static ExecutorService executor(Object hashObj) { - return executors[Math.abs(HashUtils.fnvHash(hashObj.hashCode()) % EXECUTOR_SIZE)]; + public static int executorIndex(int executorConsistentHash) { + return Math.abs(executorConsistentHash % EXECUTOR_SIZE); } - // 在task线程的异步请求,请求成功过后依然在相同的task线程执行回调任务 - public static ExecutorService currentThreadExecutor() { + public static void executor(int executorConsistentHash, SafeRunnable runnable) { + executors[executorIndex(executorConsistentHash)].execute(runnable); + } + + // 在task,event,scheduler线程执行的异步请求,请求成功过后依然在相同的线程执行回调任务 + public static Executor currentThreadExecutor() { var threadId = Thread.currentThread().getId(); - var executor = threadMap.get(threadId); - if (executor == null) { - logger.error("threadId:[{}]找不到对应的executor", threadId); - return executor(RandomUtils.randomInt()); + var taskExecutor = threadMap.get(threadId); + if (taskExecutor != null) { + return taskExecutor; } - return executor; + + var eventExecutor = EventBus.threadExecutor(threadId); + if (eventExecutor != null) { + return eventExecutor; + } + + var schedulerExecutor = SchedulerBus.threadExecutor(threadId); + if (schedulerExecutor != null) { + return schedulerExecutor; + } + + return executors[executorIndex(RandomUtils.randomInt())]; } } diff --git a/net/src/main/java/com/zfoo/net/task/dispatcher/ConsistentHashTaskDispatch.java b/net/src/main/java/com/zfoo/net/task/dispatcher/ConsistentHashTaskDispatch.java index f723d63e..3e19e934 100644 --- a/net/src/main/java/com/zfoo/net/task/dispatcher/ConsistentHashTaskDispatch.java +++ b/net/src/main/java/com/zfoo/net/task/dispatcher/ConsistentHashTaskDispatch.java @@ -16,11 +16,13 @@ package com.zfoo.net.task.dispatcher; import com.zfoo.net.session.model.AttributeType; import com.zfoo.net.task.TaskBus; import com.zfoo.net.task.model.PacketReceiverTask; +import com.zfoo.util.math.HashUtils; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class ConsistentHashTaskDispatch extends AbstractTaskDispatch { @@ -32,22 +34,22 @@ public class ConsistentHashTaskDispatch extends AbstractTaskDispatch { } @Override - public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) { + public Executor getExecutor(ExecutorService[] executors, PacketReceiverTask packetReceiverTask) { var attachment = packetReceiverTask.getAttachment(); if (attachment == null) { var session = packetReceiverTask.getSession(); - Long uid = session.getAttribute(AttributeType.UID); + var uid = session.getAttribute(AttributeType.UID); if (uid == null) { - return SessionIdTaskDispatch.getInstance().getExecutor(packetReceiverTask); + return SessionIdTaskDispatch.getInstance().getExecutor(executors, packetReceiverTask); } else { - return TaskBus.executor(uid); + return executors[TaskBus.executorIndex(HashUtils.fnvHash(uid))]; } } // 可见最终是根据附加包的信息选择服务端由哪个线程执行这个业务 - return TaskBus.executor(attachment.executorConsistentHash()); + return executors[TaskBus.executorIndex(attachment.executorConsistentHash())]; } } diff --git a/net/src/main/java/com/zfoo/net/task/dispatcher/ITaskDispatch.java b/net/src/main/java/com/zfoo/net/task/dispatcher/ITaskDispatch.java index 1903f5ed..03be2830 100644 --- a/net/src/main/java/com/zfoo/net/task/dispatcher/ITaskDispatch.java +++ b/net/src/main/java/com/zfoo/net/task/dispatcher/ITaskDispatch.java @@ -15,14 +15,15 @@ package com.zfoo.net.task.dispatcher; import com.zfoo.net.task.model.PacketReceiverTask; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public interface ITaskDispatch { - ExecutorService getExecutor(PacketReceiverTask packetReceiverTask); + Executor getExecutor(ExecutorService[] executors, PacketReceiverTask packetReceiverTask); } diff --git a/net/src/main/java/com/zfoo/net/task/dispatcher/RandomTaskDispatch.java b/net/src/main/java/com/zfoo/net/task/dispatcher/RandomTaskDispatch.java index 23d138bb..e9548f89 100644 --- a/net/src/main/java/com/zfoo/net/task/dispatcher/RandomTaskDispatch.java +++ b/net/src/main/java/com/zfoo/net/task/dispatcher/RandomTaskDispatch.java @@ -15,11 +15,13 @@ package com.zfoo.net.task.dispatcher; import com.zfoo.net.task.TaskBus; import com.zfoo.net.task.model.PacketReceiverTask; +import com.zfoo.util.math.RandomUtils; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class RandomTaskDispatch extends AbstractTaskDispatch { @@ -31,8 +33,8 @@ public class RandomTaskDispatch extends AbstractTaskDispatch { } @Override - public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) { - return TaskBus.executor(-1); + public Executor getExecutor(ExecutorService[] executors, PacketReceiverTask packetReceiverTask) { + return executors[TaskBus.executorIndex(RandomUtils.randomInt())]; } } diff --git a/net/src/main/java/com/zfoo/net/task/dispatcher/SessionIdTaskDispatch.java b/net/src/main/java/com/zfoo/net/task/dispatcher/SessionIdTaskDispatch.java index 0a67eb39..000637c4 100644 --- a/net/src/main/java/com/zfoo/net/task/dispatcher/SessionIdTaskDispatch.java +++ b/net/src/main/java/com/zfoo/net/task/dispatcher/SessionIdTaskDispatch.java @@ -17,12 +17,13 @@ import com.zfoo.net.task.TaskBus; import com.zfoo.net.task.model.PacketReceiverTask; import com.zfoo.util.math.HashUtils; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; /** * 同一个session总是分配到同一个线程池执行 * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class SessionIdTaskDispatch extends AbstractTaskDispatch { @@ -34,9 +35,9 @@ public class SessionIdTaskDispatch extends AbstractTaskDispatch { } @Override - public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) { + public Executor getExecutor(ExecutorService[] executors, PacketReceiverTask packetReceiverTask) { var session = packetReceiverTask.getSession(); - return TaskBus.executor(session.getSid()); + return executors[TaskBus.executorIndex(HashUtils.fnvHash(session.getSid()))]; } } diff --git a/net/src/main/java/com/zfoo/net/util/SimpleCache.java b/net/src/main/java/com/zfoo/net/util/SimpleCache.java index 3ae1b580..a4779606 100644 --- a/net/src/main/java/com/zfoo/net/util/SimpleCache.java +++ b/net/src/main/java/com/zfoo/net/util/SimpleCache.java @@ -20,6 +20,7 @@ import com.zfoo.event.manager.EventBus; import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.model.Pair; import com.zfoo.scheduler.manager.SchedulerBus; +import com.zfoo.util.SafeRunnable; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -38,7 +39,7 @@ import java.util.function.Function; *

* 批量查找通过batchLoadCallback方法查找,当查找的key不存在的时候,调用defaultValueBuilder生成一个默认值放入缓存。 * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class SimpleCache { @@ -93,13 +94,13 @@ public class SimpleCache { }); - SchedulerBus.scheduleAtFixedRate(new Runnable() { + SchedulerBus.scheduleAtFixedRate(new SafeRunnable() { @Override - public void run() { + public void doRun() { // 不在任务调度线程中执行耗时任务,因为任务调度线程只有一个线程池 - EventBus.asyncExecute().execute(new Runnable() { + EventBus.asyncExecute(new SafeRunnable() { @Override - public void run() { + public void doRun() { var list = new ArrayList(); while (!linkedQueue.isEmpty()) { var key = linkedQueue.poll(); diff --git a/net/src/main/java/com/zfoo/net/util/SingleCache.java b/net/src/main/java/com/zfoo/net/util/SingleCache.java index ed550a20..bca28574 100644 --- a/net/src/main/java/com/zfoo/net/util/SingleCache.java +++ b/net/src/main/java/com/zfoo/net/util/SingleCache.java @@ -15,6 +15,7 @@ package com.zfoo.net.util; import com.zfoo.event.manager.EventBus; import com.zfoo.scheduler.util.TimeUtils; +import com.zfoo.util.SafeRunnable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -23,7 +24,7 @@ import java.util.function.Supplier; /** * 单值缓存,会隔一段时间在后台刷新一下缓存 * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class SingleCache { @@ -61,9 +62,9 @@ public class SingleCache { try { if (now > refreshTime) { refreshTime = now + refreshDuration; - EventBus.asyncExecute().execute(new Runnable() { + EventBus.asyncExecute(new SafeRunnable() { @Override - public void run() { + public void doRun() { cache = supplier.get(); } }); diff --git a/net/src/test/java/com/zfoo/net/router/SignalBridgeTest.java b/net/src/test/java/com/zfoo/net/router/SignalBridgeTest.java index ddb6b9d8..ffb23d8e 100644 --- a/net/src/test/java/com/zfoo/net/router/SignalBridgeTest.java +++ b/net/src/test/java/com/zfoo/net/router/SignalBridgeTest.java @@ -15,6 +15,7 @@ package com.zfoo.net.router; import com.zfoo.event.manager.EventBus; import com.zfoo.net.router.route.SignalBridge; import com.zfoo.scheduler.util.TimeUtils; +import com.zfoo.util.SafeRunnable; import com.zfoo.util.ThreadUtils; import org.junit.Ignore; import org.junit.Test; @@ -24,7 +25,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ @Ignore @@ -54,9 +55,9 @@ public class SignalBridgeTest { var countDownLatch = new CountDownLatch(executorSize); for (var i = 0; i < executorSize; i++) { - EventBus.execute(i).execute(new Runnable() { + EventBus.execute(i, new SafeRunnable() { @Override - public void run() { + public void doRun() { addAndRemoveArray(); countDownLatch.countDown(); } @@ -72,9 +73,9 @@ public class SignalBridgeTest { var countDownLatch = new CountDownLatch(executorSize); for (int i = 0; i < executorSize; i++) { - EventBus.execute(i).execute(new Runnable() { + EventBus.execute(i, new SafeRunnable() { @Override - public void run() { + public void doRun() { addAndRemoveMap(); countDownLatch.countDown(); } diff --git a/orm/src/main/java/com/zfoo/orm/model/persister/CronOrmPersister.java b/orm/src/main/java/com/zfoo/orm/model/persister/CronOrmPersister.java index 8a09cb33..d6d64c5a 100644 --- a/orm/src/main/java/com/zfoo/orm/model/persister/CronOrmPersister.java +++ b/orm/src/main/java/com/zfoo/orm/model/persister/CronOrmPersister.java @@ -20,6 +20,7 @@ import com.zfoo.orm.model.vo.EntityDef; import com.zfoo.protocol.exception.ExceptionUtils; import com.zfoo.scheduler.manager.SchedulerBus; import com.zfoo.scheduler.util.TimeUtils; +import com.zfoo.util.SafeRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.support.CronExpression; @@ -27,7 +28,7 @@ import org.springframework.scheduling.support.CronExpression; import java.util.concurrent.TimeUnit; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class CronOrmPersister extends AbstractOrmPersister { @@ -74,13 +75,13 @@ public class CronOrmPersister extends AbstractOrmPersister { } if (!OrmContext.isStop()) { - SchedulerBus.schedule(new Runnable() { + SchedulerBus.schedule(new SafeRunnable() { @Override - public void run() { + public void doRun() { if (!OrmContext.isStop()) { - EventBus.execute(entityDef.getClazz().hashCode()).execute(new Runnable() { + EventBus.execute(entityDef.getClazz().hashCode(), new SafeRunnable() { @Override - public void run() { + public void doRun() { entityCaches.persistAll(); schedulePersist(); } diff --git a/orm/src/main/java/com/zfoo/orm/model/persister/TimeOrmPersister.java b/orm/src/main/java/com/zfoo/orm/model/persister/TimeOrmPersister.java index d0ded106..393fd816 100644 --- a/orm/src/main/java/com/zfoo/orm/model/persister/TimeOrmPersister.java +++ b/orm/src/main/java/com/zfoo/orm/model/persister/TimeOrmPersister.java @@ -19,11 +19,12 @@ import com.zfoo.orm.model.cache.EntityCaches; import com.zfoo.orm.model.vo.EntityDef; import com.zfoo.protocol.util.StringUtils; import com.zfoo.scheduler.manager.SchedulerBus; +import com.zfoo.util.SafeRunnable; import java.util.concurrent.TimeUnit; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class TimeOrmPersister extends AbstractOrmPersister { @@ -43,13 +44,13 @@ public class TimeOrmPersister extends AbstractOrmPersister { @Override public void start() { - SchedulerBus.scheduleAtFixedRate(new Runnable() { + SchedulerBus.scheduleAtFixedRate(new SafeRunnable() { @Override - public void run() { + public void doRun() { if (!OrmContext.isStop()) { - EventBus.execute(entityDef.getClazz().hashCode()).execute(new Runnable() { + EventBus.execute(entityDef.getClazz().hashCode(), new SafeRunnable() { @Override - public void run() { + public void doRun() { entityCaches.persistAll(); } }); diff --git a/orm/src/test/java/com/zfoo/orm/lpmap/ConcurrentFileChannelMapTest.java b/orm/src/test/java/com/zfoo/orm/lpmap/ConcurrentFileChannelMapTest.java index 667c0783..a167caed 100644 --- a/orm/src/test/java/com/zfoo/orm/lpmap/ConcurrentFileChannelMapTest.java +++ b/orm/src/test/java/com/zfoo/orm/lpmap/ConcurrentFileChannelMapTest.java @@ -15,6 +15,7 @@ package com.zfoo.orm.lpmap; import com.zfoo.event.manager.EventBus; import com.zfoo.orm.lpmap.model.MyPacket; import com.zfoo.protocol.ProtocolManager; +import com.zfoo.util.SafeRunnable; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -25,7 +26,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ @Ignore @@ -41,9 +42,9 @@ public class ConcurrentFileChannelMapTest { var countdown = new CountDownLatch(EventBus.EXECUTORS_SIZE); for (int i = 0; i < EventBus.EXECUTORS_SIZE; i++) { - EventBus.asyncExecute().execute(new Runnable() { + EventBus.asyncExecute(new SafeRunnable() { @Override - public void run() { + public void doRun() { var key = atomicInt.getAndIncrement(); while (key < count) { var myPacket = MyPacket.valueOf(key, String.valueOf(key)); diff --git a/orm/src/test/java/com/zfoo/orm/lpmap/ConcurrentHeapMapTest.java b/orm/src/test/java/com/zfoo/orm/lpmap/ConcurrentHeapMapTest.java index fa35120d..ce9895d5 100644 --- a/orm/src/test/java/com/zfoo/orm/lpmap/ConcurrentHeapMapTest.java +++ b/orm/src/test/java/com/zfoo/orm/lpmap/ConcurrentHeapMapTest.java @@ -15,6 +15,7 @@ package com.zfoo.orm.lpmap; import com.zfoo.event.manager.EventBus; import com.zfoo.orm.lpmap.model.MyPacket; import com.zfoo.protocol.ProtocolManager; +import com.zfoo.util.SafeRunnable; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -25,7 +26,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ @Ignore @@ -58,9 +59,9 @@ public class ConcurrentHeapMapTest { var countdown = new CountDownLatch(EventBus.EXECUTORS_SIZE); for (int i = 0; i < EventBus.EXECUTORS_SIZE; i++) { - EventBus.asyncExecute().execute(new Runnable() { + EventBus.asyncExecute(new SafeRunnable() { @Override - public void run() { + public void doRun() { var key = atomicInt.getAndIncrement(); while (key < count) { var myPacket = MyPacket.valueOf(key, String.valueOf(key)); diff --git a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java index 8abb8352..f3b0902b 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java @@ -14,22 +14,23 @@ package com.zfoo.scheduler.manager; import com.zfoo.protocol.collection.CollectionUtils; +import com.zfoo.protocol.util.StringUtils; import com.zfoo.scheduler.SchedulerContext; import com.zfoo.scheduler.model.vo.SchedulerDefinition; import com.zfoo.scheduler.util.TimeUtils; +import com.zfoo.util.SafeRunnable; +import io.netty.util.concurrent.FastThreadLocalThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Instant; import java.time.ZonedDateTime; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public abstract class SchedulerBus { @@ -41,6 +42,9 @@ public abstract class SchedulerBus { * scheduler默认只有一个单线程的线程池 */ private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1)); + + private static long threadId = 0; + /** * 上一次trigger触发时间 */ @@ -63,6 +67,31 @@ public abstract class SchedulerBus { }, 0, TRIGGER_MILLIS_INTERVAL, TimeUnit.MILLISECONDS); } + public static class SchedulerThreadFactory implements ThreadFactory { + + private int poolNumber; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final ThreadGroup group; + + public SchedulerThreadFactory(int poolNumber) { + var s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + this.poolNumber = poolNumber; + } + + @Override + public Thread newThread(Runnable runnable) { + var threadName = StringUtils.format("scheduler-p{}-t{}", poolNumber, threadNumber.getAndIncrement()); + var thread = new FastThreadLocalThread(group, runnable, threadName, 0); + thread.setDaemon(false); + thread.setPriority(Thread.NORM_PRIORITY); + thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e)); + threadId = thread.getId(); + return thread; + } + + } + public static void refreshMinTriggerTimestamp() { var minTimestamp = Long.MAX_VALUE; @@ -134,46 +163,24 @@ public abstract class SchedulerBus { /** * 不断执行的周期循环任务 */ - public static void scheduleAtFixedRate(Runnable runnable, long period, TimeUnit unit) { + public static void scheduleAtFixedRate(SafeRunnable runnable, long period, TimeUnit unit) { if (SchedulerContext.isStop()) { return; } - executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - runnable.run(); - } catch (Exception e) { - logger.error("scheduleAtFixedRate未知exception异常", e); - } catch (Throwable t) { - logger.error("scheduleAtFixedRate未知error异常", t); - } - } - }, 0, period, unit); + executor.scheduleAtFixedRate(runnable, 0, period, unit); } /** * 固定延迟执行的任务 */ - public static void schedule(Runnable runnable, long delay, TimeUnit unit) { + public static void schedule(SafeRunnable runnable, long delay, TimeUnit unit) { if (SchedulerContext.isStop()) { return; } - executor.schedule(new Runnable() { - @Override - public void run() { - try { - runnable.run(); - } catch (Exception e) { - logger.error("schedule未知exception异常", e); - } catch (Throwable t) { - logger.error("schedule未知error异常", t); - } - } - }, delay, unit); + executor.schedule(runnable, delay, unit); } /** @@ -186,4 +193,8 @@ public abstract class SchedulerBus { schedulerDefList.add(SchedulerDefinition.valueOf(cron, runnable)); } + + public static Executor threadExecutor(long currentThreadId) { + return threadId == currentThreadId ? executor : null; + } } diff --git a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerThreadFactory.java b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerThreadFactory.java deleted file mode 100644 index 8a646b22..00000000 --- a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerThreadFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (C) 2020 The zfoo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.zfoo.scheduler.manager; - -import io.netty.util.concurrent.FastThreadLocalThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author jaysunxiao - * @version 3.0 - */ -public class SchedulerThreadFactory implements ThreadFactory { - - private static final Logger logger = LoggerFactory.getLogger(SchedulerThreadFactory.class); - - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - - public SchedulerThreadFactory(int poolNumber) { - var s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - namePrefix = "scheduler-p" + poolNumber + "-t"; - } - - @Override - public Thread newThread(Runnable runnable) { - var t = new FastThreadLocalThread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0); - t.setDaemon(false); - t.setPriority(Thread.NORM_PRIORITY); - t.setUncaughtExceptionHandler((thread, e) -> logger.error(thread.toString(), e)); - return t; - } - -} diff --git a/net/src/main/java/com/zfoo/net/task/model/SafeRunnable.java b/util/src/main/java/com/zfoo/util/SafeRunnable.java similarity index 90% rename from net/src/main/java/com/zfoo/net/task/model/SafeRunnable.java rename to util/src/main/java/com/zfoo/util/SafeRunnable.java index 998485e8..61d8f654 100644 --- a/net/src/main/java/com/zfoo/net/task/model/SafeRunnable.java +++ b/util/src/main/java/com/zfoo/util/SafeRunnable.java @@ -1,10 +1,10 @@ -package com.zfoo.net.task.model; +package com.zfoo.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public abstract class SafeRunnable implements Runnable { diff --git a/util/src/main/java/com/zfoo/util/math/ConsistentHash.java b/util/src/main/java/com/zfoo/util/math/ConsistentHash.java index a3f95a26..a5499df4 100644 --- a/util/src/main/java/com/zfoo/util/math/ConsistentHash.java +++ b/util/src/main/java/com/zfoo/util/math/ConsistentHash.java @@ -23,7 +23,7 @@ import java.util.TreeMap; /** * 带虚拟节点的一致性Hash算法,参考:http://www.zsythink.net/archives/1182 * - * @author jaysunxiao + * @author godotg * @version 3.0 */ @@ -35,7 +35,7 @@ public class ConsistentHash { // 虚拟节点,key表示虚拟节点的hash值,value表示虚拟节点的名称 private TreeMap> virtualNodeTreeMap = new TreeMap<>(); - // 虚拟节点的数目,数量越大约均匀,经验值150 + // 虚拟节点的数目,数量越大约均匀 private int virtualNodes = 0; public ConsistentHash(List> realNodes, int virtualNodes) {