perf[orm]: find thread by threadExecutorMap

This commit is contained in:
jaysunxiao
2024-09-23 19:34:48 +08:00
parent 6dd5b3ac65
commit 082c2fa581
6 changed files with 37 additions and 31 deletions
@@ -93,7 +93,7 @@ public abstract class EventBus {
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
var executor = executors[poolNumber];
AssertionUtils.notNull(executor);
ThreadUtils.registerExecutor(thread.getId(), executor);
ThreadUtils.registerSingleThreadExecutor(thread, executor);
return thread;
}
}
@@ -86,7 +86,7 @@ public final class TaskBus {
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
var executor = executors[poolNumber];
AssertionUtils.notNull(executor);
ThreadUtils.registerExecutor(thread.getId(), executor);
ThreadUtils.registerSingleThreadExecutor(thread, executor);
return thread;
}
}
+2 -2
View File
@@ -24,8 +24,8 @@ import java.util.function.BiConsumer;
public interface IEntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> {
/**
* EN: Load data from the database to the cache and return a default value with an empty ID if the database does not exist.
* CN: 从数据库中加载数据到缓存,如果数据库不存在则返回一个id为空的默认值
* EN: Load data from the database to the cache and return null if the database does not exist.
* CN: 从数据库中加载数据到缓存,如果数据库不存在则返回null
*/
@Nullable
E load(PK pk);
@@ -63,7 +63,7 @@ public abstract class MongoIdUtils {
return document.getLong("count") + 1;
}
} catch (Throwable t) {
logger.info("getIncrementIdFromMongo collection:[{}] document:[{}] default error", collectionName, documentName, t);
logger.error("getIncrementIdFromMongo collection:[{}] document:[{}] default error", collectionName, documentName, t);
}
var query = Filters.eq("_id", documentName);
@@ -75,7 +75,7 @@ public abstract class MongoIdUtils {
var document = collection.findOneAndUpdate(query, Updates.combine(inc, setOnInsert), new FindOneAndUpdateOptions().upsert(true));
return null == document ? INIT_ID : document.getLong(COUNT) + 1;
} catch (Throwable t) {
logger.info("getIncrementIdFromMongo collection:[{}] document:[{}] retry error! ", collectionName, documentName, t);
logger.error("getIncrementIdFromMongo collection:[{}] document:[{}] retry error! ", collectionName, documentName, t);
}
}
@@ -13,6 +13,7 @@
package com.zfoo.protocol.util;
import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject;
import com.zfoo.protocol.model.Pair;
import io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,25 +41,6 @@ public abstract class ThreadUtils {
}
}
/**
* 通过线程号寻找对应的线程
*/
public static Thread findThread(long threadId) {
var group = Thread.currentThread().getThreadGroup();
while (group != null) {
var threads = new Thread[group.activeCount() * 2];
var count = group.enumerate(threads, true);
for (var i = 0; i < count; i++) {
if (threadId == threads[i].getId()) {
return threads[i];
}
}
group = group.getParent();
}
return null;
}
public static void shutdown(ExecutorService executor) {
try {
if (!executor.isTerminated()) {
@@ -127,14 +109,38 @@ public abstract class ThreadUtils {
}
// -----------------------------------------------------------------------------------------------------------------
// threadId -> Executor
private static final CopyOnWriteHashMapLongObject<Executor> threadExecutorMap = new CopyOnWriteHashMapLongObject<>(Runtime.getRuntime().availableProcessors() * 8);
public static void registerExecutor(long threadId, Executor executor) {
threadExecutorMap.put(threadId, executor);
// threadId -> (Thread, Executor)
private static final CopyOnWriteHashMapLongObject<Pair<Thread, Executor>> threadExecutorMap = new CopyOnWriteHashMapLongObject<>(Runtime.getRuntime().availableProcessors() * 8);
public static void registerSingleThreadExecutor(Thread thread, Executor executor) {
threadExecutorMap.put(thread.getId(), new Pair<>(thread, executor));
}
public static Executor executorByThreadId(long threadId) {
return threadExecutorMap.get(threadId);
var threadExecutor = threadExecutorMap.getPrimitive(threadId);
return threadExecutor == null ? null : threadExecutor.getValue();
}
/**
* search for the corresponding thread by the thread id
*/
public static Thread findThread(long threadId) {
var threadExecutor = threadExecutorMap.getPrimitive(threadId);
if (threadExecutor != null) {
return threadExecutor.getKey();
}
var group = Thread.currentThread().getThreadGroup();
while (group != null) {
var threads = new Thread[group.activeCount() * 2];
var count = group.enumerate(threads, true);
for (var i = 0; i < count; i++) {
if (threadId == threads[i].getId()) {
return threads[i];
}
}
group = group.getParent();
}
return null;
}
}
@@ -83,7 +83,7 @@ public abstract class SchedulerBus {
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
ThreadUtils.registerExecutor(thread.getId(), executor);
ThreadUtils.registerSingleThreadExecutor(thread, executor);
return thread;
}