From 4d211f0a533c21c7eec6bf0f98f5ec264ef33456 Mon Sep 17 00:00:00 2001 From: godotg Date: Sun, 11 Dec 2022 19:53:57 +0800 Subject: [PATCH] ref[net]: fefactored task bus scheduling to use faster hashcode as the default calculation parameter --- .../java/com/zfoo/net/consumer/Consumer.java | 13 +++--- .../java/com/zfoo/net/consumer/IConsumer.java | 2 +- .../AbstractConsumerLoadBalancer.java | 3 +- .../ConsistentHashConsumerLoadBalancer.java | 2 +- .../balancer/IConsumerLoadBalancer.java | 2 +- .../balancer/RandomConsumerLoadBalancer.java | 2 +- .../ShortestTimeConsumerLoadBalancer.java | 2 +- .../consumer/event/ConsumerStartEvent.java | 2 +- .../zfoo/net/consumer/registry/IRegistry.java | 2 +- .../net/consumer/registry/RegisterVO.java | 2 +- .../zfoo/net/handler/GatewayRouteHandler.java | 2 +- .../main/java/com/zfoo/net/router/Router.java | 31 +++++--------- .../router/attachment/GatewayAttachment.java | 41 ++++++++----------- .../net/router/attachment/HttpAttachment.java | 37 ++++++++--------- .../net/router/attachment/IAttachment.java | 4 +- .../router/attachment/NoAnswerAttachment.java | 21 +++++----- .../router/attachment/SignalAttachment.java | 20 ++++----- .../net/router/attachment/UdpAttachment.java | 4 +- .../main/java/com/zfoo/net/task/TaskBus.java | 34 +++++++++------ 19 files changed, 102 insertions(+), 124 deletions(-) diff --git a/net/src/main/java/com/zfoo/net/consumer/Consumer.java b/net/src/main/java/com/zfoo/net/consumer/Consumer.java index 172fc447..2e778d3f 100644 --- a/net/src/main/java/com/zfoo/net/consumer/Consumer.java +++ b/net/src/main/java/com/zfoo/net/consumer/Consumer.java @@ -26,14 +26,13 @@ import com.zfoo.net.router.exception.ErrorResponseException; import com.zfoo.net.router.exception.NetTimeOutException; import com.zfoo.net.router.exception.UnexpectedProtocolException; import com.zfoo.net.router.route.SignalBridge; +import com.zfoo.net.task.TaskBus; import com.zfoo.protocol.IPacket; import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.registration.ProtocolModule; import com.zfoo.protocol.util.JsonUtils; import com.zfoo.protocol.util.StringUtils; -import com.zfoo.util.math.HashUtils; -import com.zfoo.util.math.RandomUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +46,7 @@ import java.util.concurrent.TimeoutException; *

* 在clientSession中选择一个可用的session,最终还是调用的IRouter中的方法 * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class Consumer implements IConsumer { @@ -78,8 +77,8 @@ public class Consumer implements IConsumer { try { var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId())); var session = loadBalancer.loadBalancer(packet, argument); - var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); - NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash)); + var taskExecutorHash = TaskBus.calTaskExecutorHash(argument); + NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(taskExecutorHash)); } catch (Throwable t) { logger.error("consumer发送未知异常", t); } @@ -93,8 +92,8 @@ public class Consumer implements IConsumer { // 下面的代码逻辑同Router的syncAsk,如果修改的话,记得一起修改 var clientSignalAttachment = new SignalAttachment(); - var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); - clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash); + var taskExecutorHash = TaskBus.calTaskExecutorHash(argument); + clientSignalAttachment.setTaskExecutorHash(taskExecutorHash); try { SignalBridge.addSignalAttachment(clientSignalAttachment); diff --git a/net/src/main/java/com/zfoo/net/consumer/IConsumer.java b/net/src/main/java/com/zfoo/net/consumer/IConsumer.java index c1f33e5c..3f9a45bb 100644 --- a/net/src/main/java/com/zfoo/net/consumer/IConsumer.java +++ b/net/src/main/java/com/zfoo/net/consumer/IConsumer.java @@ -21,7 +21,7 @@ import com.zfoo.protocol.registration.ProtocolModule; import org.springframework.lang.Nullable; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public interface IConsumer { diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java index 6ee263e3..399521cf 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java @@ -19,7 +19,6 @@ import com.zfoo.net.session.model.AttributeType; import com.zfoo.net.session.model.Session; import com.zfoo.protocol.IPacket; import com.zfoo.protocol.ProtocolManager; -import com.zfoo.protocol.model.Pair; import com.zfoo.protocol.registration.ProtocolModule; import com.zfoo.protocol.util.StringUtils; @@ -29,7 +28,7 @@ import java.util.Objects; import java.util.stream.Collectors; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalancer { diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java index 788f81a5..0a0470f6 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java @@ -35,7 +35,7 @@ import java.util.stream.Collectors; *

* 通过argument计算一致性hash * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBalancer { diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/IConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/IConsumerLoadBalancer.java index 85f3f92a..b631bf3e 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/IConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/IConsumerLoadBalancer.java @@ -19,7 +19,7 @@ import com.zfoo.protocol.IPacket; import org.springframework.lang.Nullable; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public interface IConsumerLoadBalancer { diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java index 3d1d7d3e..43fe3948 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java @@ -22,7 +22,7 @@ import com.zfoo.util.math.RandomUtils; /** * 随机负载均衡器,任选服务提供者的其中之一 * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class RandomConsumerLoadBalancer extends AbstractConsumerLoadBalancer { diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java index 6dda500e..c59a9cb0 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java @@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; /** * 最少时间调用负载均衡器,优先选择调用时间最短的session * - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class ShortestTimeConsumerLoadBalancer extends AbstractConsumerLoadBalancer { diff --git a/net/src/main/java/com/zfoo/net/consumer/event/ConsumerStartEvent.java b/net/src/main/java/com/zfoo/net/consumer/event/ConsumerStartEvent.java index 669316e5..56b29bf3 100644 --- a/net/src/main/java/com/zfoo/net/consumer/event/ConsumerStartEvent.java +++ b/net/src/main/java/com/zfoo/net/consumer/event/ConsumerStartEvent.java @@ -18,7 +18,7 @@ import com.zfoo.net.consumer.registry.RegisterVO; import com.zfoo.net.session.model.Session; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class ConsumerStartEvent implements IEvent { diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/IRegistry.java b/net/src/main/java/com/zfoo/net/consumer/registry/IRegistry.java index a86ed023..048b185a 100644 --- a/net/src/main/java/com/zfoo/net/consumer/registry/IRegistry.java +++ b/net/src/main/java/com/zfoo/net/consumer/registry/IRegistry.java @@ -22,7 +22,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public interface IRegistry { diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java index ffaaee8c..0f5fa235 100644 --- a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java +++ b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java @@ -32,7 +32,7 @@ import java.util.Objects; import java.util.stream.Collectors; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class RegisterVO { diff --git a/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java b/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java index 6a9e8301..45fb2c1c 100644 --- a/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java +++ b/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java @@ -87,7 +87,7 @@ public class GatewayRouteHandler extends ServerRouteHandler { // 例子:以聊天服务来说,玩家知道自己在哪个群组groupId中,那往这个群发送消息时,会在Packet中带上这个groupId做为一致性hash就可以了。 if (packet instanceof IGatewayLoadBalancer) { var loadBalancerConsistentHashObject = ((IGatewayLoadBalancer) packet).loadBalancerConsistentHashObject(); - gatewayAttachment.useExecutorConsistentHash(loadBalancerConsistentHashObject); + gatewayAttachment.wrapTaskExecutorHash(loadBalancerConsistentHashObject); forwardingPacket(packet, gatewayAttachment, loadBalancerConsistentHashObject); return; } else { diff --git a/net/src/main/java/com/zfoo/net/router/Router.java b/net/src/main/java/com/zfoo/net/router/Router.java index cc65c7d6..1b059982 100644 --- a/net/src/main/java/com/zfoo/net/router/Router.java +++ b/net/src/main/java/com/zfoo/net/router/Router.java @@ -40,8 +40,6 @@ import com.zfoo.protocol.IPacket; import com.zfoo.protocol.exception.ExceptionUtils; import com.zfoo.protocol.util.JsonUtils; import com.zfoo.protocol.util.StringUtils; -import com.zfoo.util.math.HashUtils; -import com.zfoo.util.math.RandomUtils; import io.netty.util.concurrent.FastThreadLocal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,8 +94,7 @@ public class Router implements IRouter { // 这里会让之前的CompletableFuture得到结果,从而像asyncAsk之类的回调到结果 removedAttachment.getResponseFuture().complete(packet); } else { - logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception." - , JsonUtils.object2String(packet), JsonUtils.object2String(attachment)); + logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment)); } // 注意:这个return,这样子,asyncAsk的结果就返回了。 @@ -137,9 +134,7 @@ public class Router implements IRouter { } send(gatewaySession, packet, signalAttachmentInGatewayAttachment); } else { - logger.error("gateway receives packet:[{}] and attachment:[{}] from server" + - ", but serverSessionMap has no session[id:{}], perhaps client disconnected from gateway." - , JsonUtils.object2String(packet), JsonUtils.object2String(attachment), gatewayAttachment.getSid()); + logger.error("gateway receives packet:[{}] and attachment:[{}] from server" + ", but serverSessionMap has no session[id:{}], perhaps client disconnected from gateway.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment), gatewayAttachment.getSid()); } return; } @@ -196,8 +191,8 @@ public class Router implements IRouter { @Override public SyncAnswer syncAsk(Session session, IPacket packet, @Nullable Class answerClass, @Nullable Object argument) throws Exception { var clientSignalAttachment = new SignalAttachment(); - var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); - clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash); + var taskExecutorHash = TaskBus.calTaskExecutorHash(argument); + clientSignalAttachment.setTaskExecutorHash(taskExecutorHash); try { SignalBridge.addSignalAttachment(clientSignalAttachment); @@ -211,14 +206,12 @@ public class Router implements IRouter { throw new ErrorResponseException((Error) responsePacket); } if (answerClass != null && answerClass != responsePacket.getClass()) { - throw new UnexpectedProtocolException(StringUtils.format("client expect protocol:[{}], but found protocol:[{}]" - , answerClass, responsePacket.getClass().getName())); + throw new UnexpectedProtocolException(StringUtils.format("client expect protocol:[{}], but found protocol:[{}]", answerClass, responsePacket.getClass().getName())); } return new SyncAnswer<>((T) responsePacket, clientSignalAttachment); } catch (TimeoutException e) { - throw new NetTimeOutException(StringUtils.format("syncAsk timeout exception, ask:[{}], attachment:[{}]" - , JsonUtils.object2String(packet), JsonUtils.object2String(clientSignalAttachment))); + throw new NetTimeOutException(StringUtils.format("syncAsk timeout exception, ask:[{}], attachment:[{}]", JsonUtils.object2String(packet), JsonUtils.object2String(clientSignalAttachment))); } finally { SignalBridge.removeSignalAttachment(clientSignalAttachment); } @@ -233,11 +226,9 @@ public class Router implements IRouter { @Override public AsyncAnswer asyncAsk(Session session, IPacket packet, @Nullable Class answerClass, @Nullable Object argument) { var clientSignalAttachment = new SignalAttachment(); - // 因此第3个参数传null,会得到一个随机的值,在得到结果回调时,是随机的线程 - // 这个值用于返回时,在CompletableFuture中选择哪个线程执行,也就是回到哪个线程 - var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); + var taskExecutorHash = TaskBus.calTaskExecutorHash(argument); - clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash); + clientSignalAttachment.setTaskExecutorHash(taskExecutorHash); // 服务器在同步或异步的消息处理中,又调用了同步或异步的方法,这时候threadReceiverAttachment不为空 var serverSignalAttachment = serverReceiveSignalAttachmentThreadLocal.get(); @@ -248,8 +239,7 @@ public class Router implements IRouter { clientSignalAttachment.getResponseFuture() // 因此超时的情况,返回的是null - .completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS) - .thenApply(answer -> { + .completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS).thenApply(answer -> { if (answer == null) { throw new NetTimeOutException(StringUtils.format("async ask [{}] timeout exception", packet.getClass().getSimpleName())); } @@ -262,8 +252,7 @@ public class Router implements IRouter { throw new UnexpectedProtocolException("client expect protocol:[{}], but found protocol:[{}]", answerClass, answer.getClass().getName()); } return answer; - }) - .whenCompleteAsync((answer, throwable) -> { + }).whenCompleteAsync((answer, throwable) -> { // 注意:进入这个方法的时机是:在上面的receive方法中,由于是asyncAsk的消息,attachment不为空,会调用CompletableFuture的complete方法 try { SignalBridge.removeSignalAttachment(clientSignalAttachment); diff --git a/net/src/main/java/com/zfoo/net/router/attachment/GatewayAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/GatewayAttachment.java index 152beaf3..f4da2fef 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/GatewayAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/GatewayAttachment.java @@ -14,7 +14,6 @@ package com.zfoo.net.router.attachment; import com.zfoo.net.session.model.AttributeType; import com.zfoo.net.session.model.Session; -import com.zfoo.util.math.HashUtils; import org.springframework.lang.Nullable; /** @@ -23,7 +22,7 @@ import org.springframework.lang.Nullable; */ public class GatewayAttachment implements IAttachment { - public static final transient short PROTOCOL_ID = 1; + public static final short PROTOCOL_ID = 1; /** * session id @@ -38,12 +37,8 @@ public class GatewayAttachment implements IAttachment { */ private long uid; - /** - * EN:Whether to use a consistent hash ID as a consistent hash ID - * CN:是否使用consistentHashId作为一致性hashId - */ - private boolean useExecutorConsistentHash; - private int executorConsistentHash; + private boolean useTaskExecutorHashParam; + private int taskExecutorHashParam; /** * true for the client, false for the server @@ -81,12 +76,8 @@ public class GatewayAttachment implements IAttachment { } @Override - public int executorConsistentHash() { - if (useExecutorConsistentHash) { - return executorConsistentHash; - } else { - return HashUtils.fnvHash(uid); - } + public int taskExecutorHash() { + return useTaskExecutorHashParam ? taskExecutorHashParam : (int) uid; } @Override @@ -94,9 +85,9 @@ public class GatewayAttachment implements IAttachment { return PROTOCOL_ID; } - public void useExecutorConsistentHash(Object argument) { - this.useExecutorConsistentHash = true; - this.executorConsistentHash = HashUtils.fnvHash(argument); + public void wrapTaskExecutorHash(Object argument) { + this.useTaskExecutorHashParam = true; + this.taskExecutorHashParam = argument.hashCode(); } public long getSid() { @@ -115,20 +106,20 @@ public class GatewayAttachment implements IAttachment { this.uid = uid; } - public boolean isUseExecutorConsistentHash() { - return useExecutorConsistentHash; + public boolean isUseTaskExecutorHashParam() { + return useTaskExecutorHashParam; } - public void setUseExecutorConsistentHash(boolean useExecutorConsistentHash) { - this.useExecutorConsistentHash = useExecutorConsistentHash; + public void setUseTaskExecutorHashParam(boolean useTaskExecutorHashParam) { + this.useTaskExecutorHashParam = useTaskExecutorHashParam; } - public int getExecutorConsistentHash() { - return executorConsistentHash; + public int getTaskExecutorHashParam() { + return taskExecutorHashParam; } - public void setExecutorConsistentHash(int executorConsistentHash) { - this.executorConsistentHash = executorConsistentHash; + public void setTaskExecutorHashParam(int taskExecutorHashParam) { + this.taskExecutorHashParam = taskExecutorHashParam; } public boolean isClient() { diff --git a/net/src/main/java/com/zfoo/net/router/attachment/HttpAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/HttpAttachment.java index 132e79b4..701842d9 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/HttpAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/HttpAttachment.java @@ -12,7 +12,6 @@ package com.zfoo.net.router.attachment; -import com.zfoo.util.math.HashUtils; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; @@ -22,13 +21,13 @@ import io.netty.handler.codec.http.HttpResponseStatus; */ public class HttpAttachment implements IAttachment { - public static final transient short PROTOCOL_ID = 3; + public static final short PROTOCOL_ID = 3; private long uid; - private boolean useExecutorConsistentHash; + private boolean useTaskExecutorHashParam; - private int executorConsistentHash; + private int taskExecutorHashParam; private transient FullHttpRequest fullHttpRequest; @@ -47,12 +46,8 @@ public class HttpAttachment implements IAttachment { } @Override - public int executorConsistentHash() { - if (useExecutorConsistentHash) { - return executorConsistentHash; - } else { - return HashUtils.fnvHash(uid); - } + public int taskExecutorHash() { + return useTaskExecutorHashParam ? taskExecutorHashParam : (int) uid; } @Override @@ -60,9 +55,9 @@ public class HttpAttachment implements IAttachment { return PROTOCOL_ID; } - public void useExecutorConsistentHash(Object argument) { - this.useExecutorConsistentHash = true; - this.executorConsistentHash = HashUtils.fnvHash(argument); + public void wrapTaskExecutorHash(Object argument) { + this.useTaskExecutorHashParam = true; + this.taskExecutorHashParam = argument.hashCode(); } public long getUid() { @@ -73,20 +68,20 @@ public class HttpAttachment implements IAttachment { this.uid = uid; } - public boolean isUseExecutorConsistentHash() { - return useExecutorConsistentHash; + public boolean isUseTaskExecutorHashParam() { + return useTaskExecutorHashParam; } - public void setUseExecutorConsistentHash(boolean useExecutorConsistentHash) { - this.useExecutorConsistentHash = useExecutorConsistentHash; + public void setUseTaskExecutorHashParam(boolean useTaskExecutorHashParam) { + this.useTaskExecutorHashParam = useTaskExecutorHashParam; } - public int getExecutorConsistentHash() { - return executorConsistentHash; + public int getTaskExecutorHashParam() { + return taskExecutorHashParam; } - public void setExecutorConsistentHash(int executorConsistentHash) { - this.executorConsistentHash = executorConsistentHash; + public void setTaskExecutorHashParam(int taskExecutorHashParam) { + this.taskExecutorHashParam = taskExecutorHashParam; } public FullHttpRequest getFullHttpRequest() { diff --git a/net/src/main/java/com/zfoo/net/router/attachment/IAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/IAttachment.java index 677c55ff..fafa6869 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/IAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/IAttachment.java @@ -25,9 +25,7 @@ public interface IAttachment extends IPacket { /** * EN:Used to determine which thread the message is processed on * CN:用来确定这条消息在哪一个线程处理 - * - * @return 一致性hashId */ - int executorConsistentHash(); + int taskExecutorHash(); } diff --git a/net/src/main/java/com/zfoo/net/router/attachment/NoAnswerAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/NoAnswerAttachment.java index 3cdd140f..5cc769f6 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/NoAnswerAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/NoAnswerAttachment.java @@ -20,13 +20,13 @@ package com.zfoo.net.router.attachment; */ public class NoAnswerAttachment implements IAttachment { - public static final transient short PROTOCOL_ID = 4; + public static final short PROTOCOL_ID = 4; - private int executorConsistentHash; + private int taskExecutorHash; - public static NoAnswerAttachment valueOf(int executorConsistentHash) { + public static NoAnswerAttachment valueOf(int taskExecutorHash) { var attachment = new NoAnswerAttachment(); - attachment.executorConsistentHash = executorConsistentHash; + attachment.taskExecutorHash = taskExecutorHash; return attachment; } @@ -36,8 +36,8 @@ public class NoAnswerAttachment implements IAttachment { } @Override - public int executorConsistentHash() { - return executorConsistentHash; + public int taskExecutorHash() { + return taskExecutorHash; } @Override @@ -45,12 +45,11 @@ public class NoAnswerAttachment implements IAttachment { return PROTOCOL_ID; } - - public int getExecutorConsistentHash() { - return executorConsistentHash; + public int getTaskExecutorHash() { + return taskExecutorHash; } - public void setExecutorConsistentHash(int executorConsistentHash) { - this.executorConsistentHash = executorConsistentHash; + public void setTaskExecutorHash(int taskExecutorHash) { + this.taskExecutorHash = taskExecutorHash; } } diff --git a/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java index 95eddc89..4d90504a 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class SignalAttachment implements IAttachment { - public static final transient short PROTOCOL_ID = 0; + public static final short PROTOCOL_ID = 0; public static final AtomicInteger ATOMIC_ID = new AtomicInteger(0); @@ -35,10 +35,10 @@ public class SignalAttachment implements IAttachment { private int signalId = ATOMIC_ID.incrementAndGet(); /** - * EN:The parameter used to calculate the consistency hash in Task Bus - * CN:用来在TaskBus中计算一致性hash的参数 + * EN:The parameter used to calculate the hash in TaskBus to determine which thread the task is executed on + * CN:用来在TaskBus中计算hash的参数,用来决定任务在哪一条线程执行 */ - private int executorConsistentHash = -1; + private int taskExecutorHash = -1; /** * true for the client, false for the server @@ -66,8 +66,8 @@ public class SignalAttachment implements IAttachment { } @Override - public int executorConsistentHash() { - return executorConsistentHash; + public int taskExecutorHash() { + return taskExecutorHash; } public long getTimestamp() { @@ -109,12 +109,12 @@ public class SignalAttachment implements IAttachment { this.signalId = signalId; } - public int getExecutorConsistentHash() { - return executorConsistentHash; + public int getTaskExecutorHash() { + return taskExecutorHash; } - public void setExecutorConsistentHash(int executorConsistentHash) { - this.executorConsistentHash = executorConsistentHash; + public void setTaskExecutorHash(int taskExecutorHash) { + this.taskExecutorHash = taskExecutorHash; } public boolean isClient() { diff --git a/net/src/main/java/com/zfoo/net/router/attachment/UdpAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/UdpAttachment.java index f60da67e..2832d729 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/UdpAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/UdpAttachment.java @@ -20,7 +20,7 @@ import com.zfoo.util.math.RandomUtils; */ public class UdpAttachment implements IAttachment { - public static final transient short PROTOCOL_ID = 2; + public static final short PROTOCOL_ID = 2; private String host; private int port; @@ -38,7 +38,7 @@ public class UdpAttachment implements IAttachment { } @Override - public int executorConsistentHash() { + public int taskExecutorHash() { return RandomUtils.randomInt(); } diff --git a/net/src/main/java/com/zfoo/net/task/TaskBus.java b/net/src/main/java/com/zfoo/net/task/TaskBus.java index ba6a927a..cf69d938 100644 --- a/net/src/main/java/com/zfoo/net/task/TaskBus.java +++ b/net/src/main/java/com/zfoo/net/task/TaskBus.java @@ -97,17 +97,17 @@ public final class TaskBus { *

* zfoo中通过对线程池的细粒度控制,从而实现了Actor模型。 * 为了简单,可以把Actor可以理解为一个用户或者一个玩家。 - * 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(executorConsistentHash)永远会得到一致的结果, - * 从而保证同一个用户或者玩家的请求总能通过executorConsistentHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。 + * 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(taskExecutorHash)永远会得到一致的结果, + * 从而保证同一个用户或者玩家的请求总能通过taskExecutorHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。 *

- * zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = executorConsistentHash。 + * zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = taskExecutorHash。 *

* 在zfoo这套线程模型中,保证了服务器所接收到的Packet(最终被包装成PacketReceiverTask任务),永远只会在同一条线程处理, - * TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的executorConsistentHash计算。 + * TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的taskExecutorHash计算。 *

- * IAttachment的不同,executorConsistentHash也不同: - * GatewayAttachment:默认是executorConsistentHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定 - * SignalAttachment:executorConsistentHash通过IRouter和IConsumer的argument参数指定 + * IAttachment的不同,taskExecutorHash也不同: + * GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定 + * SignalAttachment:taskExecutorHash通过IRouter和IConsumer的argument参数指定 */ public static void dispatch(PacketReceiverTask task) { var attachment = task.getAttachment(); @@ -121,16 +121,24 @@ public final class TaskBus { execute((int) uid, task); } } else { - execute(attachment.executorConsistentHash(), task); + execute(attachment.taskExecutorHash(), task); } } - public static int executorIndex(int executorConsistentHash) { - return Math.abs(executorConsistentHash % EXECUTOR_SIZE); + public static int calTaskExecutorHash(int taskExecutorHash) { + return Math.abs(taskExecutorHash) % EXECUTOR_SIZE; } - public static void execute(int executorConsistentHash, Runnable runnable) { - executors[executorIndex(executorConsistentHash)].execute(SafeRunnable.valueOf(runnable)); + public static int calTaskExecutorHash(Object argument) { + return calTaskExecutorHash((argument == null) ? RandomUtils.randomInt() : argument.hashCode()); + } + + public static void execute(int taskExecutorHash, Runnable runnable) { + executors[calTaskExecutorHash(taskExecutorHash)].execute(SafeRunnable.valueOf(runnable)); + } + + public static void execute(Object argument, Runnable runnable) { + execute(calTaskExecutorHash(argument), runnable); } // 在task,event,scheduler线程执行的异步请求,请求成功过后依然在相同的线程执行回调任务 @@ -151,7 +159,7 @@ public final class TaskBus { return schedulerExecutor; } - return executors[executorIndex(RandomUtils.randomInt())]; + return executors[calTaskExecutorHash(RandomUtils.randomInt())]; } }