diff --git a/net/src/main/java/com/zfoo/net/consumer/Consumer.java b/net/src/main/java/com/zfoo/net/consumer/Consumer.java index 172fc447..2e778d3f 100644 --- a/net/src/main/java/com/zfoo/net/consumer/Consumer.java +++ b/net/src/main/java/com/zfoo/net/consumer/Consumer.java @@ -26,14 +26,13 @@ import com.zfoo.net.router.exception.ErrorResponseException; import com.zfoo.net.router.exception.NetTimeOutException; import com.zfoo.net.router.exception.UnexpectedProtocolException; import com.zfoo.net.router.route.SignalBridge; +import com.zfoo.net.task.TaskBus; import com.zfoo.protocol.IPacket; import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.registration.ProtocolModule; import com.zfoo.protocol.util.JsonUtils; import com.zfoo.protocol.util.StringUtils; -import com.zfoo.util.math.HashUtils; -import com.zfoo.util.math.RandomUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +46,7 @@ import java.util.concurrent.TimeoutException; *
* 在clientSession中选择一个可用的session,最终还是调用的IRouter中的方法 * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class Consumer implements IConsumer { @@ -78,8 +77,8 @@ public class Consumer implements IConsumer { try { var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId())); var session = loadBalancer.loadBalancer(packet, argument); - var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); - NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash)); + var taskExecutorHash = TaskBus.calTaskExecutorHash(argument); + NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(taskExecutorHash)); } catch (Throwable t) { logger.error("consumer发送未知异常", t); } @@ -93,8 +92,8 @@ public class Consumer implements IConsumer { // 下面的代码逻辑同Router的syncAsk,如果修改的话,记得一起修改 var clientSignalAttachment = new SignalAttachment(); - var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); - clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash); + var taskExecutorHash = TaskBus.calTaskExecutorHash(argument); + clientSignalAttachment.setTaskExecutorHash(taskExecutorHash); try { SignalBridge.addSignalAttachment(clientSignalAttachment); diff --git a/net/src/main/java/com/zfoo/net/consumer/IConsumer.java b/net/src/main/java/com/zfoo/net/consumer/IConsumer.java index c1f33e5c..3f9a45bb 100644 --- a/net/src/main/java/com/zfoo/net/consumer/IConsumer.java +++ b/net/src/main/java/com/zfoo/net/consumer/IConsumer.java @@ -21,7 +21,7 @@ import com.zfoo.protocol.registration.ProtocolModule; import org.springframework.lang.Nullable; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public interface IConsumer { diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java index 6ee263e3..399521cf 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java @@ -19,7 +19,6 @@ import com.zfoo.net.session.model.AttributeType; import com.zfoo.net.session.model.Session; import com.zfoo.protocol.IPacket; import com.zfoo.protocol.ProtocolManager; -import com.zfoo.protocol.model.Pair; import com.zfoo.protocol.registration.ProtocolModule; import com.zfoo.protocol.util.StringUtils; @@ -29,7 +28,7 @@ import java.util.Objects; import java.util.stream.Collectors; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalancer { diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java index 788f81a5..0a0470f6 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java @@ -35,7 +35,7 @@ import java.util.stream.Collectors; *
* 通过argument计算一致性hash
*
- * @author jaysunxiao
+ * @author godotg
* @version 3.0
*/
public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/IConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/IConsumerLoadBalancer.java
index 85f3f92a..b631bf3e 100644
--- a/net/src/main/java/com/zfoo/net/consumer/balancer/IConsumerLoadBalancer.java
+++ b/net/src/main/java/com/zfoo/net/consumer/balancer/IConsumerLoadBalancer.java
@@ -19,7 +19,7 @@ import com.zfoo.protocol.IPacket;
import org.springframework.lang.Nullable;
/**
- * @author jaysunxiao
+ * @author godotg
* @version 3.0
*/
public interface IConsumerLoadBalancer {
diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java
index 3d1d7d3e..43fe3948 100644
--- a/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java
+++ b/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java
@@ -22,7 +22,7 @@ import com.zfoo.util.math.RandomUtils;
/**
* 随机负载均衡器,任选服务提供者的其中之一
*
- * @author jaysunxiao
+ * @author godotg
* @version 3.0
*/
public class RandomConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java
index 6dda500e..c59a9cb0 100644
--- a/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java
+++ b/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* 最少时间调用负载均衡器,优先选择调用时间最短的session
*
- * @author jaysunxiao
+ * @author godotg
* @version 3.0
*/
public class ShortestTimeConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
diff --git a/net/src/main/java/com/zfoo/net/consumer/event/ConsumerStartEvent.java b/net/src/main/java/com/zfoo/net/consumer/event/ConsumerStartEvent.java
index 669316e5..56b29bf3 100644
--- a/net/src/main/java/com/zfoo/net/consumer/event/ConsumerStartEvent.java
+++ b/net/src/main/java/com/zfoo/net/consumer/event/ConsumerStartEvent.java
@@ -18,7 +18,7 @@ import com.zfoo.net.consumer.registry.RegisterVO;
import com.zfoo.net.session.model.Session;
/**
- * @author jaysunxiao
+ * @author godotg
* @version 3.0
*/
public class ConsumerStartEvent implements IEvent {
diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/IRegistry.java b/net/src/main/java/com/zfoo/net/consumer/registry/IRegistry.java
index a86ed023..048b185a 100644
--- a/net/src/main/java/com/zfoo/net/consumer/registry/IRegistry.java
+++ b/net/src/main/java/com/zfoo/net/consumer/registry/IRegistry.java
@@ -22,7 +22,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
- * @author jaysunxiao
+ * @author godotg
* @version 3.0
*/
public interface IRegistry {
diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java
index ffaaee8c..0f5fa235 100644
--- a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java
+++ b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java
@@ -32,7 +32,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
/**
- * @author jaysunxiao
+ * @author godotg
* @version 3.0
*/
public class RegisterVO {
diff --git a/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java b/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java
index 6a9e8301..45fb2c1c 100644
--- a/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java
+++ b/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java
@@ -87,7 +87,7 @@ public class GatewayRouteHandler extends ServerRouteHandler {
// 例子:以聊天服务来说,玩家知道自己在哪个群组groupId中,那往这个群发送消息时,会在Packet中带上这个groupId做为一致性hash就可以了。
if (packet instanceof IGatewayLoadBalancer) {
var loadBalancerConsistentHashObject = ((IGatewayLoadBalancer) packet).loadBalancerConsistentHashObject();
- gatewayAttachment.useExecutorConsistentHash(loadBalancerConsistentHashObject);
+ gatewayAttachment.wrapTaskExecutorHash(loadBalancerConsistentHashObject);
forwardingPacket(packet, gatewayAttachment, loadBalancerConsistentHashObject);
return;
} else {
diff --git a/net/src/main/java/com/zfoo/net/router/Router.java b/net/src/main/java/com/zfoo/net/router/Router.java
index cc65c7d6..1b059982 100644
--- a/net/src/main/java/com/zfoo/net/router/Router.java
+++ b/net/src/main/java/com/zfoo/net/router/Router.java
@@ -40,8 +40,6 @@ import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.exception.ExceptionUtils;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
-import com.zfoo.util.math.HashUtils;
-import com.zfoo.util.math.RandomUtils;
import io.netty.util.concurrent.FastThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,8 +94,7 @@ public class Router implements IRouter {
// 这里会让之前的CompletableFuture得到结果,从而像asyncAsk之类的回调到结果
removedAttachment.getResponseFuture().complete(packet);
} else {
- logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception."
- , JsonUtils.object2String(packet), JsonUtils.object2String(attachment));
+ logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment));
}
// 注意:这个return,这样子,asyncAsk的结果就返回了。
@@ -137,9 +134,7 @@ public class Router implements IRouter {
}
send(gatewaySession, packet, signalAttachmentInGatewayAttachment);
} else {
- logger.error("gateway receives packet:[{}] and attachment:[{}] from server" +
- ", but serverSessionMap has no session[id:{}], perhaps client disconnected from gateway."
- , JsonUtils.object2String(packet), JsonUtils.object2String(attachment), gatewayAttachment.getSid());
+ logger.error("gateway receives packet:[{}] and attachment:[{}] from server" + ", but serverSessionMap has no session[id:{}], perhaps client disconnected from gateway.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment), gatewayAttachment.getSid());
}
return;
}
@@ -196,8 +191,8 @@ public class Router implements IRouter {
@Override
public
* zfoo中通过对线程池的细粒度控制,从而实现了Actor模型。
* 为了简单,可以把Actor可以理解为一个用户或者一个玩家。
- * 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(executorConsistentHash)永远会得到一致的结果,
- * 从而保证同一个用户或者玩家的请求总能通过executorConsistentHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。
+ * 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(taskExecutorHash)永远会得到一致的结果,
+ * 从而保证同一个用户或者玩家的请求总能通过taskExecutorHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。
*
- * zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = executorConsistentHash。
+ * zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = taskExecutorHash。
*
* 在zfoo这套线程模型中,保证了服务器所接收到的Packet(最终被包装成PacketReceiverTask任务),永远只会在同一条线程处理,
- * TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的executorConsistentHash计算。
+ * TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的taskExecutorHash计算。
*
- * IAttachment的不同,executorConsistentHash也不同:
- * GatewayAttachment:默认是executorConsistentHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定
- * SignalAttachment:executorConsistentHash通过IRouter和IConsumer的argument参数指定
+ * IAttachment的不同,taskExecutorHash也不同:
+ * GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定
+ * SignalAttachment:taskExecutorHash通过IRouter和IConsumer的argument参数指定
*/
public static void dispatch(PacketReceiverTask task) {
var attachment = task.getAttachment();
@@ -121,16 +121,24 @@ public final class TaskBus {
execute((int) uid, task);
}
} else {
- execute(attachment.executorConsistentHash(), task);
+ execute(attachment.taskExecutorHash(), task);
}
}
- public static int executorIndex(int executorConsistentHash) {
- return Math.abs(executorConsistentHash % EXECUTOR_SIZE);
+ public static int calTaskExecutorHash(int taskExecutorHash) {
+ return Math.abs(taskExecutorHash) % EXECUTOR_SIZE;
}
- public static void execute(int executorConsistentHash, Runnable runnable) {
- executors[executorIndex(executorConsistentHash)].execute(SafeRunnable.valueOf(runnable));
+ public static int calTaskExecutorHash(Object argument) {
+ return calTaskExecutorHash((argument == null) ? RandomUtils.randomInt() : argument.hashCode());
+ }
+
+ public static void execute(int taskExecutorHash, Runnable runnable) {
+ executors[calTaskExecutorHash(taskExecutorHash)].execute(SafeRunnable.valueOf(runnable));
+ }
+
+ public static void execute(Object argument, Runnable runnable) {
+ execute(calTaskExecutorHash(argument), runnable);
}
// 在task,event,scheduler线程执行的异步请求,请求成功过后依然在相同的线程执行回调任务
@@ -151,7 +159,7 @@ public final class TaskBus {
return schedulerExecutor;
}
- return executors[executorIndex(RandomUtils.randomInt())];
+ return executors[calTaskExecutorHash(RandomUtils.randomInt())];
}
}