perf[event]: 调整EventBus代码结构

This commit is contained in:
jaysunxiao
2021-06-26 10:04:06 +08:00
parent 83b803db51
commit 6bd65ed1a6
2 changed files with 40 additions and 31 deletions
@@ -17,7 +17,6 @@ import com.zfoo.event.model.event.IEvent;
import com.zfoo.event.model.vo.IEventReceiver;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.util.math.RandomUtils;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,8 +27,6 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
@@ -42,43 +39,18 @@ public abstract class EventBus {
// 线程池的大小
private static final int EXECUTORS_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final ExecutorService[] executors;
private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE];
private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMap;
private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMap = new HashMap<>();
static {
receiverMap = new HashMap<>();
executors = new ExecutorService[EXECUTORS_SIZE];
for (int i = 0; i < executors.length; i++) {
var namedThreadFactory = new EventThreadFactory();
var namedThreadFactory = new EventThreadFactory(i + 1);
executors[i] = Executors.newSingleThreadExecutor(namedThreadFactory);
}
}
private static class EventThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
EventThreadFactory() {
var s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "event-p" + poolNumber.getAndIncrement() + "-t";
}
@Override
public Thread newThread(Runnable runnable) {
var t = new FastThreadLocalThread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setUncaughtExceptionHandler((thread, e) -> logger.error(thread.toString(), e));
return t;
}
}
/**
* 同步抛出一个事件,会在当前线程中运行
*
@@ -0,0 +1,37 @@
package com.zfoo.event.manager;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author jaysunxiao
* @version 3.0
*/
public class EventThreadFactory implements ThreadFactory {
private static final Logger logger = LoggerFactory.getLogger(EventThreadFactory.class);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public EventThreadFactory(int poolNumber) {
var s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "event-p" + poolNumber + "-t";
}
@Override
public Thread newThread(Runnable runnable) {
var t = new FastThreadLocalThread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setUncaughtExceptionHandler((thread, e) -> logger.error(thread.toString(), e));
return t;
}
}