ref[net]: fefactored task bus scheduling to use faster hashcode as the default calculation parameter

This commit is contained in:
godotg
2022-12-11 19:53:57 +08:00
parent fb0728d204
commit 4d211f0a53
19 changed files with 102 additions and 124 deletions
@@ -26,14 +26,13 @@ import com.zfoo.net.router.exception.ErrorResponseException;
import com.zfoo.net.router.exception.NetTimeOutException;
import com.zfoo.net.router.exception.UnexpectedProtocolException;
import com.zfoo.net.router.route.SignalBridge;
import com.zfoo.net.task.TaskBus;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.registration.ProtocolModule;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.math.HashUtils;
import com.zfoo.util.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +46,7 @@ import java.util.concurrent.TimeoutException;
* <p>
* 在clientSession中选择一个可用的session,最终还是调用的IRouter中的方法
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class Consumer implements IConsumer {
@@ -78,8 +77,8 @@ public class Consumer implements IConsumer {
try {
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId()));
var session = loadBalancer.loadBalancer(packet, argument);
var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument);
NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash));
var taskExecutorHash = TaskBus.calTaskExecutorHash(argument);
NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(taskExecutorHash));
} catch (Throwable t) {
logger.error("consumer发送未知异常", t);
}
@@ -93,8 +92,8 @@ public class Consumer implements IConsumer {
// 下面的代码逻辑同Router的syncAsk,如果修改的话,记得一起修改
var clientSignalAttachment = new SignalAttachment();
var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument);
clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash);
var taskExecutorHash = TaskBus.calTaskExecutorHash(argument);
clientSignalAttachment.setTaskExecutorHash(taskExecutorHash);
try {
SignalBridge.addSignalAttachment(clientSignalAttachment);
@@ -21,7 +21,7 @@ import com.zfoo.protocol.registration.ProtocolModule;
import org.springframework.lang.Nullable;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public interface IConsumer {
@@ -19,7 +19,6 @@ import com.zfoo.net.session.model.AttributeType;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.model.Pair;
import com.zfoo.protocol.registration.ProtocolModule;
import com.zfoo.protocol.util.StringUtils;
@@ -29,7 +28,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalancer {
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
* <p>
* 通过argument计算一致性hash
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
@@ -19,7 +19,7 @@ import com.zfoo.protocol.IPacket;
import org.springframework.lang.Nullable;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public interface IConsumerLoadBalancer {
@@ -22,7 +22,7 @@ import com.zfoo.util.math.RandomUtils;
/**
* 随机负载均衡器,任选服务提供者的其中之一
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class RandomConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* 最少时间调用负载均衡器,优先选择调用时间最短的session
*
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class ShortestTimeConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
@@ -18,7 +18,7 @@ import com.zfoo.net.consumer.registry.RegisterVO;
import com.zfoo.net.session.model.Session;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class ConsumerStartEvent implements IEvent {
@@ -22,7 +22,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public interface IRegistry {
@@ -32,7 +32,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class RegisterVO {
@@ -87,7 +87,7 @@ public class GatewayRouteHandler extends ServerRouteHandler {
// 例子:以聊天服务来说,玩家知道自己在哪个群组groupId中,那往这个群发送消息时,会在Packet中带上这个groupId做为一致性hash就可以了。
if (packet instanceof IGatewayLoadBalancer) {
var loadBalancerConsistentHashObject = ((IGatewayLoadBalancer) packet).loadBalancerConsistentHashObject();
gatewayAttachment.useExecutorConsistentHash(loadBalancerConsistentHashObject);
gatewayAttachment.wrapTaskExecutorHash(loadBalancerConsistentHashObject);
forwardingPacket(packet, gatewayAttachment, loadBalancerConsistentHashObject);
return;
} else {
@@ -40,8 +40,6 @@ import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.exception.ExceptionUtils;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.math.HashUtils;
import com.zfoo.util.math.RandomUtils;
import io.netty.util.concurrent.FastThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,8 +94,7 @@ public class Router implements IRouter {
// 这里会让之前的CompletableFuture得到结果,从而像asyncAsk之类的回调到结果
removedAttachment.getResponseFuture().complete(packet);
} else {
logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception."
, JsonUtils.object2String(packet), JsonUtils.object2String(attachment));
logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment));
}
// 注意:这个return,这样子,asyncAsk的结果就返回了。
@@ -137,9 +134,7 @@ public class Router implements IRouter {
}
send(gatewaySession, packet, signalAttachmentInGatewayAttachment);
} else {
logger.error("gateway receives packet:[{}] and attachment:[{}] from server" +
", but serverSessionMap has no session[id:{}], perhaps client disconnected from gateway."
, JsonUtils.object2String(packet), JsonUtils.object2String(attachment), gatewayAttachment.getSid());
logger.error("gateway receives packet:[{}] and attachment:[{}] from server" + ", but serverSessionMap has no session[id:{}], perhaps client disconnected from gateway.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment), gatewayAttachment.getSid());
}
return;
}
@@ -196,8 +191,8 @@ public class Router implements IRouter {
@Override
public <T extends IPacket> SyncAnswer<T> syncAsk(Session session, IPacket packet, @Nullable Class<T> answerClass, @Nullable Object argument) throws Exception {
var clientSignalAttachment = new SignalAttachment();
var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument);
clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash);
var taskExecutorHash = TaskBus.calTaskExecutorHash(argument);
clientSignalAttachment.setTaskExecutorHash(taskExecutorHash);
try {
SignalBridge.addSignalAttachment(clientSignalAttachment);
@@ -211,14 +206,12 @@ public class Router implements IRouter {
throw new ErrorResponseException((Error) responsePacket);
}
if (answerClass != null && answerClass != responsePacket.getClass()) {
throw new UnexpectedProtocolException(StringUtils.format("client expect protocol:[{}], but found protocol:[{}]"
, answerClass, responsePacket.getClass().getName()));
throw new UnexpectedProtocolException(StringUtils.format("client expect protocol:[{}], but found protocol:[{}]", answerClass, responsePacket.getClass().getName()));
}
return new SyncAnswer<>((T) responsePacket, clientSignalAttachment);
} catch (TimeoutException e) {
throw new NetTimeOutException(StringUtils.format("syncAsk timeout exception, ask:[{}], attachment:[{}]"
, JsonUtils.object2String(packet), JsonUtils.object2String(clientSignalAttachment)));
throw new NetTimeOutException(StringUtils.format("syncAsk timeout exception, ask:[{}], attachment:[{}]", JsonUtils.object2String(packet), JsonUtils.object2String(clientSignalAttachment)));
} finally {
SignalBridge.removeSignalAttachment(clientSignalAttachment);
}
@@ -233,11 +226,9 @@ public class Router implements IRouter {
@Override
public <T extends IPacket> AsyncAnswer<T> asyncAsk(Session session, IPacket packet, @Nullable Class<T> answerClass, @Nullable Object argument) {
var clientSignalAttachment = new SignalAttachment();
// 因此第3个参数传null,会得到一个随机的值,在得到结果回调时,是随机的线程
// 这个值用于返回时,在CompletableFuture中选择哪个线程执行,也就是回到哪个线程
var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument);
var taskExecutorHash = TaskBus.calTaskExecutorHash(argument);
clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash);
clientSignalAttachment.setTaskExecutorHash(taskExecutorHash);
// 服务器在同步或异步的消息处理中,又调用了同步或异步的方法,这时候threadReceiverAttachment不为空
var serverSignalAttachment = serverReceiveSignalAttachmentThreadLocal.get();
@@ -248,8 +239,7 @@ public class Router implements IRouter {
clientSignalAttachment.getResponseFuture()
// 因此超时的情况,返回的是null
.completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)
.thenApply(answer -> {
.completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS).thenApply(answer -> {
if (answer == null) {
throw new NetTimeOutException(StringUtils.format("async ask [{}] timeout exception", packet.getClass().getSimpleName()));
}
@@ -262,8 +252,7 @@ public class Router implements IRouter {
throw new UnexpectedProtocolException("client expect protocol:[{}], but found protocol:[{}]", answerClass, answer.getClass().getName());
}
return answer;
})
.whenCompleteAsync((answer, throwable) -> {
}).whenCompleteAsync((answer, throwable) -> {
// 注意:进入这个方法的时机是:在上面的receive方法中,由于是asyncAsk的消息,attachment不为空,会调用CompletableFuture的complete方法
try {
SignalBridge.removeSignalAttachment(clientSignalAttachment);
@@ -14,7 +14,6 @@ package com.zfoo.net.router.attachment;
import com.zfoo.net.session.model.AttributeType;
import com.zfoo.net.session.model.Session;
import com.zfoo.util.math.HashUtils;
import org.springframework.lang.Nullable;
/**
@@ -23,7 +22,7 @@ import org.springframework.lang.Nullable;
*/
public class GatewayAttachment implements IAttachment {
public static final transient short PROTOCOL_ID = 1;
public static final short PROTOCOL_ID = 1;
/**
* session id
@@ -38,12 +37,8 @@ public class GatewayAttachment implements IAttachment {
*/
private long uid;
/**
* EN:Whether to use a consistent hash ID as a consistent hash ID
* CN:是否使用consistentHashId作为一致性hashId
*/
private boolean useExecutorConsistentHash;
private int executorConsistentHash;
private boolean useTaskExecutorHashParam;
private int taskExecutorHashParam;
/**
* true for the client, false for the server
@@ -81,12 +76,8 @@ public class GatewayAttachment implements IAttachment {
}
@Override
public int executorConsistentHash() {
if (useExecutorConsistentHash) {
return executorConsistentHash;
} else {
return HashUtils.fnvHash(uid);
}
public int taskExecutorHash() {
return useTaskExecutorHashParam ? taskExecutorHashParam : (int) uid;
}
@Override
@@ -94,9 +85,9 @@ public class GatewayAttachment implements IAttachment {
return PROTOCOL_ID;
}
public void useExecutorConsistentHash(Object argument) {
this.useExecutorConsistentHash = true;
this.executorConsistentHash = HashUtils.fnvHash(argument);
public void wrapTaskExecutorHash(Object argument) {
this.useTaskExecutorHashParam = true;
this.taskExecutorHashParam = argument.hashCode();
}
public long getSid() {
@@ -115,20 +106,20 @@ public class GatewayAttachment implements IAttachment {
this.uid = uid;
}
public boolean isUseExecutorConsistentHash() {
return useExecutorConsistentHash;
public boolean isUseTaskExecutorHashParam() {
return useTaskExecutorHashParam;
}
public void setUseExecutorConsistentHash(boolean useExecutorConsistentHash) {
this.useExecutorConsistentHash = useExecutorConsistentHash;
public void setUseTaskExecutorHashParam(boolean useTaskExecutorHashParam) {
this.useTaskExecutorHashParam = useTaskExecutorHashParam;
}
public int getExecutorConsistentHash() {
return executorConsistentHash;
public int getTaskExecutorHashParam() {
return taskExecutorHashParam;
}
public void setExecutorConsistentHash(int executorConsistentHash) {
this.executorConsistentHash = executorConsistentHash;
public void setTaskExecutorHashParam(int taskExecutorHashParam) {
this.taskExecutorHashParam = taskExecutorHashParam;
}
public boolean isClient() {
@@ -12,7 +12,6 @@
package com.zfoo.net.router.attachment;
import com.zfoo.util.math.HashUtils;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -22,13 +21,13 @@ import io.netty.handler.codec.http.HttpResponseStatus;
*/
public class HttpAttachment implements IAttachment {
public static final transient short PROTOCOL_ID = 3;
public static final short PROTOCOL_ID = 3;
private long uid;
private boolean useExecutorConsistentHash;
private boolean useTaskExecutorHashParam;
private int executorConsistentHash;
private int taskExecutorHashParam;
private transient FullHttpRequest fullHttpRequest;
@@ -47,12 +46,8 @@ public class HttpAttachment implements IAttachment {
}
@Override
public int executorConsistentHash() {
if (useExecutorConsistentHash) {
return executorConsistentHash;
} else {
return HashUtils.fnvHash(uid);
}
public int taskExecutorHash() {
return useTaskExecutorHashParam ? taskExecutorHashParam : (int) uid;
}
@Override
@@ -60,9 +55,9 @@ public class HttpAttachment implements IAttachment {
return PROTOCOL_ID;
}
public void useExecutorConsistentHash(Object argument) {
this.useExecutorConsistentHash = true;
this.executorConsistentHash = HashUtils.fnvHash(argument);
public void wrapTaskExecutorHash(Object argument) {
this.useTaskExecutorHashParam = true;
this.taskExecutorHashParam = argument.hashCode();
}
public long getUid() {
@@ -73,20 +68,20 @@ public class HttpAttachment implements IAttachment {
this.uid = uid;
}
public boolean isUseExecutorConsistentHash() {
return useExecutorConsistentHash;
public boolean isUseTaskExecutorHashParam() {
return useTaskExecutorHashParam;
}
public void setUseExecutorConsistentHash(boolean useExecutorConsistentHash) {
this.useExecutorConsistentHash = useExecutorConsistentHash;
public void setUseTaskExecutorHashParam(boolean useTaskExecutorHashParam) {
this.useTaskExecutorHashParam = useTaskExecutorHashParam;
}
public int getExecutorConsistentHash() {
return executorConsistentHash;
public int getTaskExecutorHashParam() {
return taskExecutorHashParam;
}
public void setExecutorConsistentHash(int executorConsistentHash) {
this.executorConsistentHash = executorConsistentHash;
public void setTaskExecutorHashParam(int taskExecutorHashParam) {
this.taskExecutorHashParam = taskExecutorHashParam;
}
public FullHttpRequest getFullHttpRequest() {
@@ -25,9 +25,7 @@ public interface IAttachment extends IPacket {
/**
* EN:Used to determine which thread the message is processed on
* CN:用来确定这条消息在哪一个线程处理
*
* @return 一致性hashId
*/
int executorConsistentHash();
int taskExecutorHash();
}
@@ -20,13 +20,13 @@ package com.zfoo.net.router.attachment;
*/
public class NoAnswerAttachment implements IAttachment {
public static final transient short PROTOCOL_ID = 4;
public static final short PROTOCOL_ID = 4;
private int executorConsistentHash;
private int taskExecutorHash;
public static NoAnswerAttachment valueOf(int executorConsistentHash) {
public static NoAnswerAttachment valueOf(int taskExecutorHash) {
var attachment = new NoAnswerAttachment();
attachment.executorConsistentHash = executorConsistentHash;
attachment.taskExecutorHash = taskExecutorHash;
return attachment;
}
@@ -36,8 +36,8 @@ public class NoAnswerAttachment implements IAttachment {
}
@Override
public int executorConsistentHash() {
return executorConsistentHash;
public int taskExecutorHash() {
return taskExecutorHash;
}
@Override
@@ -45,12 +45,11 @@ public class NoAnswerAttachment implements IAttachment {
return PROTOCOL_ID;
}
public int getExecutorConsistentHash() {
return executorConsistentHash;
public int getTaskExecutorHash() {
return taskExecutorHash;
}
public void setExecutorConsistentHash(int executorConsistentHash) {
this.executorConsistentHash = executorConsistentHash;
public void setTaskExecutorHash(int taskExecutorHash) {
this.taskExecutorHash = taskExecutorHash;
}
}
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class SignalAttachment implements IAttachment {
public static final transient short PROTOCOL_ID = 0;
public static final short PROTOCOL_ID = 0;
public static final AtomicInteger ATOMIC_ID = new AtomicInteger(0);
@@ -35,10 +35,10 @@ public class SignalAttachment implements IAttachment {
private int signalId = ATOMIC_ID.incrementAndGet();
/**
* EN:The parameter used to calculate the consistency hash in Task Bus
* CN:用来在TaskBus中计算一致性hash的参数
* EN:The parameter used to calculate the hash in TaskBus to determine which thread the task is executed on
* CN:用来在TaskBus中计算hash的参数,用来决定任务在哪一条线程执行
*/
private int executorConsistentHash = -1;
private int taskExecutorHash = -1;
/**
* true for the client, false for the server
@@ -66,8 +66,8 @@ public class SignalAttachment implements IAttachment {
}
@Override
public int executorConsistentHash() {
return executorConsistentHash;
public int taskExecutorHash() {
return taskExecutorHash;
}
public long getTimestamp() {
@@ -109,12 +109,12 @@ public class SignalAttachment implements IAttachment {
this.signalId = signalId;
}
public int getExecutorConsistentHash() {
return executorConsistentHash;
public int getTaskExecutorHash() {
return taskExecutorHash;
}
public void setExecutorConsistentHash(int executorConsistentHash) {
this.executorConsistentHash = executorConsistentHash;
public void setTaskExecutorHash(int taskExecutorHash) {
this.taskExecutorHash = taskExecutorHash;
}
public boolean isClient() {
@@ -20,7 +20,7 @@ import com.zfoo.util.math.RandomUtils;
*/
public class UdpAttachment implements IAttachment {
public static final transient short PROTOCOL_ID = 2;
public static final short PROTOCOL_ID = 2;
private String host;
private int port;
@@ -38,7 +38,7 @@ public class UdpAttachment implements IAttachment {
}
@Override
public int executorConsistentHash() {
public int taskExecutorHash() {
return RandomUtils.randomInt();
}
@@ -97,17 +97,17 @@ public final class TaskBus {
* <p>
* zfoo中通过对线程池的细粒度控制,从而实现了Actor模型。
* 为了简单,可以把Actor可以理解为一个用户或者一个玩家。
* 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(executorConsistentHash)永远会得到一致的结果,
* 从而保证同一个用户或者玩家的请求总能通过executorConsistentHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。
* 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(taskExecutorHash)永远会得到一致的结果,
* 从而保证同一个用户或者玩家的请求总能通过taskExecutorHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。
* <p>
* zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = executorConsistentHash。
* zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = taskExecutorHash。
* <p>
* 在zfoo这套线程模型中,保证了服务器所接收到的Packet(最终被包装成PacketReceiverTask任务),永远只会在同一条线程处理,
* TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的executorConsistentHash计算。
* TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的taskExecutorHash计算。
* <p>
* IAttachment的不同,executorConsistentHash也不同:
* GatewayAttachment:默认是executorConsistentHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定
* SignalAttachmentexecutorConsistentHash通过IRouter和IConsumer的argument参数指定
* IAttachment的不同,taskExecutorHash也不同:
* GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定
* SignalAttachmenttaskExecutorHash通过IRouter和IConsumer的argument参数指定
*/
public static void dispatch(PacketReceiverTask task) {
var attachment = task.getAttachment();
@@ -121,16 +121,24 @@ public final class TaskBus {
execute((int) uid, task);
}
} else {
execute(attachment.executorConsistentHash(), task);
execute(attachment.taskExecutorHash(), task);
}
}
public static int executorIndex(int executorConsistentHash) {
return Math.abs(executorConsistentHash % EXECUTOR_SIZE);
public static int calTaskExecutorHash(int taskExecutorHash) {
return Math.abs(taskExecutorHash) % EXECUTOR_SIZE;
}
public static void execute(int executorConsistentHash, Runnable runnable) {
executors[executorIndex(executorConsistentHash)].execute(SafeRunnable.valueOf(runnable));
public static int calTaskExecutorHash(Object argument) {
return calTaskExecutorHash((argument == null) ? RandomUtils.randomInt() : argument.hashCode());
}
public static void execute(int taskExecutorHash, Runnable runnable) {
executors[calTaskExecutorHash(taskExecutorHash)].execute(SafeRunnable.valueOf(runnable));
}
public static void execute(Object argument, Runnable runnable) {
execute(calTaskExecutorHash(argument), runnable);
}
// 在taskeventscheduler线程执行的异步请求,请求成功过后依然在相同的线程执行回调任务
@@ -151,7 +159,7 @@ public final class TaskBus {
return schedulerExecutor;
}
return executors[executorIndex(RandomUtils.randomInt())];
return executors[calTaskExecutorHash(RandomUtils.randomInt())];
}
}