From b23ded4fbfd7abc7ee5e6f3dd8cb266dec08a9f9 Mon Sep 17 00:00:00 2001 From: godotg Date: Tue, 25 Jun 2024 11:37:26 +0800 Subject: [PATCH] ref[executor]: executor register and get --- .../java/com/zfoo/event/manager/EventBus.java | 8 +----- .../main/java/com/zfoo/net/task/TaskBus.java | 26 ++++--------------- .../com/zfoo/protocol/util/ThreadUtils.java | 13 ++++++++++ .../zfoo/scheduler/manager/SchedulerBus.java | 9 +------ 4 files changed, 20 insertions(+), 36 deletions(-) diff --git a/event/src/main/java/com/zfoo/event/manager/EventBus.java b/event/src/main/java/com/zfoo/event/manager/EventBus.java index c95fe3c4..c50ce6bf 100644 --- a/event/src/main/java/com/zfoo/event/manager/EventBus.java +++ b/event/src/main/java/com/zfoo/event/manager/EventBus.java @@ -17,7 +17,6 @@ import com.zfoo.event.model.ExceptionEvent; import com.zfoo.event.model.IEvent; import com.zfoo.event.model.TripleConsumer; import com.zfoo.protocol.collection.CollectionUtils; -import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject; import com.zfoo.protocol.util.AssertionUtils; import com.zfoo.protocol.util.RandomUtils; import com.zfoo.protocol.util.StringUtils; @@ -30,7 +29,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -52,7 +50,6 @@ public abstract class EventBus { private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE]; - private static final CopyOnWriteHashMapLongObject threadMap = new CopyOnWriteHashMapLongObject<>(EXECUTORS_SIZE); /** * event mapping */ @@ -94,7 +91,7 @@ public abstract class EventBus { thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e)); var executor = executors[poolNumber]; AssertionUtils.notNull(executor); - threadMap.put(thread.getId(), executor); + ThreadUtils.registerExecutor(thread.getId(), executor); return thread; } } @@ -154,9 +151,6 @@ public abstract class EventBus { receiverMap.computeIfAbsent(eventType, it -> new ArrayList<>(1)).add(receiver); } - public static Executor threadExecutor(long currentThreadId) { - return threadMap.getPrimitive(currentThreadId); - } } diff --git a/net/src/main/java/com/zfoo/net/task/TaskBus.java b/net/src/main/java/com/zfoo/net/task/TaskBus.java index 0928e5c9..3d2fdbed 100644 --- a/net/src/main/java/com/zfoo/net/task/TaskBus.java +++ b/net/src/main/java/com/zfoo/net/task/TaskBus.java @@ -13,14 +13,11 @@ package com.zfoo.net.task; -import com.zfoo.event.manager.EventBus; import com.zfoo.net.NetContext; -import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject; import com.zfoo.protocol.util.AssertionUtils; import com.zfoo.protocol.util.RandomUtils; import com.zfoo.protocol.util.StringUtils; import com.zfoo.protocol.util.ThreadUtils; -import com.zfoo.scheduler.manager.SchedulerBus; import io.netty.util.concurrent.FastThreadLocalThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,8 +63,6 @@ public final class TaskBus { } } - private static final CopyOnWriteHashMapLongObject threadMap = new CopyOnWriteHashMapLongObject<>(EXECUTOR_SIZE); - public static class TaskThreadFactory implements ThreadFactory { private final int poolNumber; private final AtomicInteger threadNumber = new AtomicInteger(1); @@ -87,7 +82,7 @@ public final class TaskBus { thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e)); var executor = executors[poolNumber]; AssertionUtils.notNull(executor); - threadMap.put(thread.getId(), executor); + ThreadUtils.registerExecutor(thread.getId(), executor); return thread; } } @@ -121,22 +116,11 @@ public final class TaskBus { // 在task,event,scheduler线程执行的异步请求,请求成功过后依然在相同的线程执行回调任务 public static Executor currentThreadExecutor() { var threadId = Thread.currentThread().getId(); - var taskExecutor = threadMap.getPrimitive(threadId); - if (taskExecutor != null) { - return taskExecutor; + var executor = ThreadUtils.executorByThreadId(threadId); + if (executor == null) { + return executors[calTaskExecutorIndex(RandomUtils.randomInt())]; } - - var eventExecutor = EventBus.threadExecutor(threadId); - if (eventExecutor != null) { - return eventExecutor; - } - - var schedulerExecutor = SchedulerBus.threadExecutor(threadId); - if (schedulerExecutor != null) { - return schedulerExecutor; - } - - return executors[calTaskExecutorIndex(RandomUtils.randomInt())]; + return executor; } } diff --git a/protocol/src/main/java/com/zfoo/protocol/util/ThreadUtils.java b/protocol/src/main/java/com/zfoo/protocol/util/ThreadUtils.java index 0accd934..f4bcee06 100644 --- a/protocol/src/main/java/com/zfoo/protocol/util/ThreadUtils.java +++ b/protocol/src/main/java/com/zfoo/protocol/util/ThreadUtils.java @@ -12,10 +12,12 @@ package com.zfoo.protocol.util; +import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject; import io.netty.util.concurrent.EventExecutorGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @@ -124,4 +126,15 @@ public abstract class ThreadUtils { }; } + // ----------------------------------------------------------------------------------------------------------------- + // threadId -> Executor + private static final CopyOnWriteHashMapLongObject threadExecutorMap = new CopyOnWriteHashMapLongObject<>(Runtime.getRuntime().availableProcessors() * 8); + public static void registerExecutor(long threadId, Executor executor) { + threadExecutorMap.put(threadId, executor); + } + + public static Executor executorByThreadId(long threadId) { + return threadExecutorMap.get(threadId); + } + } diff --git a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java index 25399ae2..64859d7a 100644 --- a/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java +++ b/scheduler/src/main/java/com/zfoo/scheduler/manager/SchedulerBus.java @@ -43,10 +43,6 @@ public abstract class SchedulerBus { */ private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1)); - /** - * executor创建的线程id号 - */ - private static long threadId = 0; /** * 上一次trigger触发时间 @@ -87,7 +83,7 @@ public abstract class SchedulerBus { thread.setDaemon(false); thread.setPriority(Thread.NORM_PRIORITY); thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e)); - threadId = thread.getId(); + ThreadUtils.registerExecutor(thread.getId(), executor); return thread; } @@ -191,7 +187,4 @@ public abstract class SchedulerBus { registerScheduler(SchedulerDefinition.valueOf(cron, runnable)); } - public static Executor threadExecutor(long currentThreadId) { - return threadId == currentThreadId ? executor : null; - } }