ref[executor]: executor register and get

This commit is contained in:
godotg
2024-06-25 11:37:26 +08:00
parent c3e275a788
commit b23ded4fbf
4 changed files with 20 additions and 36 deletions
@@ -17,7 +17,6 @@ import com.zfoo.event.model.ExceptionEvent;
import com.zfoo.event.model.IEvent;
import com.zfoo.event.model.TripleConsumer;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject;
import com.zfoo.protocol.util.AssertionUtils;
import com.zfoo.protocol.util.RandomUtils;
import com.zfoo.protocol.util.StringUtils;
@@ -30,7 +29,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -52,7 +50,6 @@ public abstract class EventBus {
private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE];
private static final CopyOnWriteHashMapLongObject<ExecutorService> threadMap = new CopyOnWriteHashMapLongObject<>(EXECUTORS_SIZE);
/**
* event mapping
*/
@@ -94,7 +91,7 @@ public abstract class EventBus {
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
var executor = executors[poolNumber];
AssertionUtils.notNull(executor);
threadMap.put(thread.getId(), executor);
ThreadUtils.registerExecutor(thread.getId(), executor);
return thread;
}
}
@@ -154,9 +151,6 @@ public abstract class EventBus {
receiverMap.computeIfAbsent(eventType, it -> new ArrayList<>(1)).add(receiver);
}
public static Executor threadExecutor(long currentThreadId) {
return threadMap.getPrimitive(currentThreadId);
}
}
@@ -13,14 +13,11 @@
package com.zfoo.net.task;
import com.zfoo.event.manager.EventBus;
import com.zfoo.net.NetContext;
import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject;
import com.zfoo.protocol.util.AssertionUtils;
import com.zfoo.protocol.util.RandomUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.protocol.util.ThreadUtils;
import com.zfoo.scheduler.manager.SchedulerBus;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,8 +63,6 @@ public final class TaskBus {
}
}
private static final CopyOnWriteHashMapLongObject<ExecutorService> threadMap = new CopyOnWriteHashMapLongObject<>(EXECUTOR_SIZE);
public static class TaskThreadFactory implements ThreadFactory {
private final int poolNumber;
private final AtomicInteger threadNumber = new AtomicInteger(1);
@@ -87,7 +82,7 @@ public final class TaskBus {
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
var executor = executors[poolNumber];
AssertionUtils.notNull(executor);
threadMap.put(thread.getId(), executor);
ThreadUtils.registerExecutor(thread.getId(), executor);
return thread;
}
}
@@ -121,22 +116,11 @@ public final class TaskBus {
// 在taskeventscheduler线程执行的异步请求,请求成功过后依然在相同的线程执行回调任务
public static Executor currentThreadExecutor() {
var threadId = Thread.currentThread().getId();
var taskExecutor = threadMap.getPrimitive(threadId);
if (taskExecutor != null) {
return taskExecutor;
var executor = ThreadUtils.executorByThreadId(threadId);
if (executor == null) {
return executors[calTaskExecutorIndex(RandomUtils.randomInt())];
}
var eventExecutor = EventBus.threadExecutor(threadId);
if (eventExecutor != null) {
return eventExecutor;
}
var schedulerExecutor = SchedulerBus.threadExecutor(threadId);
if (schedulerExecutor != null) {
return schedulerExecutor;
}
return executors[calTaskExecutorIndex(RandomUtils.randomInt())];
return executor;
}
}
@@ -12,10 +12,12 @@
package com.zfoo.protocol.util;
import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject;
import io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@@ -124,4 +126,15 @@ public abstract class ThreadUtils {
};
}
// -----------------------------------------------------------------------------------------------------------------
// threadId -> Executor
private static final CopyOnWriteHashMapLongObject<Executor> threadExecutorMap = new CopyOnWriteHashMapLongObject<>(Runtime.getRuntime().availableProcessors() * 8);
public static void registerExecutor(long threadId, Executor executor) {
threadExecutorMap.put(threadId, executor);
}
public static Executor executorByThreadId(long threadId) {
return threadExecutorMap.get(threadId);
}
}
@@ -43,10 +43,6 @@ public abstract class SchedulerBus {
*/
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory(1));
/**
* executor创建的线程id号
*/
private static long threadId = 0;
/**
* 上一次trigger触发时间
@@ -87,7 +83,7 @@ public abstract class SchedulerBus {
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
threadId = thread.getId();
ThreadUtils.registerExecutor(thread.getId(), executor);
return thread;
}
@@ -191,7 +187,4 @@ public abstract class SchedulerBus {
registerScheduler(SchedulerDefinition.valueOf(cron, runnable));
}
public static Executor threadExecutor(long currentThreadId) {
return threadId == currentThreadId ? executor : null;
}
}