diff --git a/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java b/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java index 29f2f4a5..bdabcd94 100644 --- a/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java +++ b/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java @@ -88,7 +88,7 @@ public class GatewayRouteHandler extends ServerRouteHandler { // 例子:以聊天服务来说,玩家知道自己在哪个群组groupId中,那往这个群发送消息时,会在Packet中带上这个groupId做为一致性hash就可以了。 if (packet instanceof IGatewayLoadBalancer) { var loadBalancerConsistentHashObject = ((IGatewayLoadBalancer) packet).loadBalancerConsistentHashObject(); - gatewayAttachment.wrapTaskExecutorHash(loadBalancerConsistentHashObject); + gatewayAttachment.setTaskExecutorHash(loadBalancerConsistentHashObject.hashCode()); forwardingPacket(packet, gatewayAttachment, loadBalancerConsistentHashObject); return; } else { diff --git a/net/src/main/java/com/zfoo/net/router/Router.java b/net/src/main/java/com/zfoo/net/router/Router.java index e36d45bf..ce0e659b 100644 --- a/net/src/main/java/com/zfoo/net/router/Router.java +++ b/net/src/main/java/com/zfoo/net/router/Router.java @@ -25,6 +25,7 @@ import com.zfoo.net.packet.common.Heartbeat; import com.zfoo.net.router.answer.AsyncAnswer; import com.zfoo.net.router.answer.SyncAnswer; import com.zfoo.net.router.attachment.GatewayAttachment; +import com.zfoo.net.router.attachment.HttpAttachment; import com.zfoo.net.router.attachment.SignalAttachment; import com.zfoo.net.router.exception.ErrorResponseException; import com.zfoo.net.router.exception.NetTimeOutException; @@ -74,10 +75,11 @@ public class Router implements IRouter { return; } + var task = new PacketReceiverTask(session, packet, attachment); if (attachment == null) { // 正常发送消息的接收,把客户端的业务请求包装下到路由策略指定的线程进行业务处理 // 注意:像客户端以asyncAsk发送请求,在服务器处理完后返回结果,在请求方也是进入这个receive方法,但是attachment不为空,会提前return掉不会走到这 - TaskBus.dispatch(new PacketReceiverTask(session, packet, null)); + TaskBus.dispatchBySession(task); return; } @@ -85,12 +87,15 @@ public class Router implements IRouter { if (attachment.getClass() == SignalAttachment.class) { var signalAttachment = (SignalAttachment) attachment; - if (signalAttachment.isClient()) { + if (signalAttachment.getClient() == SignalAttachment.SIGNAL_OUTSIDE_CLIENT) { + // 服务器收到外部客户端的SIGNAL_OUTSIDE_CLIENT,不做任何处理 + TaskBus.dispatchBySession(task); + } else if (signalAttachment.getClient() == SignalAttachment.SIGNAL_NATIVE_CLIENT) { // 服务器收到signalAttachment,不做任何处理 - signalAttachment.setClient(false); - TaskBus.dispatch(new PacketReceiverTask(session, packet, attachment)); + signalAttachment.setClient(SignalAttachment.SIGNAL_SERVER); + TaskBus.dispatchByTaskExecutorHash(signalAttachment.getTaskExecutorHash(), task); } else { - // 客户端收到服务器应答,客户端发送的时候isClient为true,服务器收到的时候将其设置为false + // 客户端收到服务器应答,客户端发送的时候client为SIGNAL_NATIVE_CLIENT,服务器收到的时候将其设置为SIGNAL_SERVER var removedAttachment = (SignalAttachment) SignalBridge.removeSignalAttachment(signalAttachment); if (removedAttachment == null) { logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment)); @@ -109,7 +114,7 @@ public class Router implements IRouter { // 注意:此时并没有return,这样子网关的消息才能发给home,在home进行处理LogoutRequest消息的处理 if (gatewayAttachment.isClient()) { gatewayAttachment.setClient(false); - TaskBus.dispatch(new PacketReceiverTask(session, packet, attachment)); + TaskBus.dispatchByTaskExecutorHash(gatewayAttachment.getTaskExecutorHash(), task); } else { // 这里是:别的服务提供者提供授权给网关,比如:在玩家登录后,home服查到了玩家uid,然后发给Gateway服 var gatewaySession = NetContext.getSessionManager().getServerSession(gatewayAttachment.getSid()); @@ -137,7 +142,13 @@ public class Router implements IRouter { return; } - TaskBus.dispatch(new PacketReceiverTask(session, packet, attachment)); + if (attachment.getClass() == HttpAttachment.class) { + var httpAttachment = (HttpAttachment) attachment; + TaskBus.dispatchByTaskExecutorHash(httpAttachment.getTaskExecutorHash(), task); + return; + } + + TaskBus.dispatchBySession(task); } @Override diff --git a/net/src/main/java/com/zfoo/net/router/attachment/GatewayAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/GatewayAttachment.java index 3dffe543..f93d51dc 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/GatewayAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/GatewayAttachment.java @@ -35,8 +35,11 @@ public class GatewayAttachment { */ private long uid; - private boolean useTaskExecutorHashParam; - private int taskExecutorHashParam; + /** + * EN:Used to determine which thread the message is processed on + * CN:用来确定这条消息在哪一个线程处理 + */ + private int taskExecutorHash; /** * true for the client, false for the server @@ -71,13 +74,9 @@ public class GatewayAttachment { * CN:用来确定这条消息在哪一个线程处理 */ public int taskExecutorHash() { - return useTaskExecutorHashParam ? taskExecutorHashParam : (int) uid; + return taskExecutorHash == 0 ? (int) uid : taskExecutorHash; } - public void wrapTaskExecutorHash(Object argument) { - this.useTaskExecutorHashParam = true; - this.taskExecutorHashParam = argument.hashCode(); - } public long getSid() { return sid; @@ -95,20 +94,12 @@ public class GatewayAttachment { this.uid = uid; } - public boolean isUseTaskExecutorHashParam() { - return useTaskExecutorHashParam; + public int getTaskExecutorHash() { + return taskExecutorHash; } - public void setUseTaskExecutorHashParam(boolean useTaskExecutorHashParam) { - this.useTaskExecutorHashParam = useTaskExecutorHashParam; - } - - public int getTaskExecutorHashParam() { - return taskExecutorHashParam; - } - - public void setTaskExecutorHashParam(int taskExecutorHashParam) { - this.taskExecutorHashParam = taskExecutorHashParam; + public void setTaskExecutorHash(int taskExecutorHash) { + this.taskExecutorHash = taskExecutorHash; } public boolean isClient() { diff --git a/net/src/main/java/com/zfoo/net/router/attachment/HttpAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/HttpAttachment.java index fda1f196..46261930 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/HttpAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/HttpAttachment.java @@ -25,9 +25,7 @@ public class HttpAttachment { private long uid; - private boolean useTaskExecutorHashParam; - - private int taskExecutorHashParam; + private int taskExecutorHash; private transient FullHttpRequest fullHttpRequest; @@ -46,12 +44,7 @@ public class HttpAttachment { * CN:用来确定这条消息在哪一个线程处理 */ public int taskExecutorHash() { - return useTaskExecutorHashParam ? taskExecutorHashParam : (int) uid; - } - - public void wrapTaskExecutorHash(Object argument) { - this.useTaskExecutorHashParam = true; - this.taskExecutorHashParam = argument.hashCode(); + return taskExecutorHash == 0 ? (int) uid : taskExecutorHash; } public long getUid() { @@ -62,20 +55,12 @@ public class HttpAttachment { this.uid = uid; } - public boolean isUseTaskExecutorHashParam() { - return useTaskExecutorHashParam; + public int getTaskExecutorHash() { + return taskExecutorHash; } - public void setUseTaskExecutorHashParam(boolean useTaskExecutorHashParam) { - this.useTaskExecutorHashParam = useTaskExecutorHashParam; - } - - public int getTaskExecutorHashParam() { - return taskExecutorHashParam; - } - - public void setTaskExecutorHashParam(int taskExecutorHashParam) { - this.taskExecutorHashParam = taskExecutorHashParam; + public void setTaskExecutorHash(int taskExecutorHash) { + this.taskExecutorHash = taskExecutorHash; } public FullHttpRequest getFullHttpRequest() { diff --git a/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java index 6cc9b0dc..015d2dff 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java @@ -31,6 +31,12 @@ public class SignalAttachment { * CN:允许负数的signalId */ public static final AtomicInteger ATOMIC_ID = new AtomicInteger(0); + /** + * 0 for the server, 1 for the sync or async native client, 2 for the outside client such as browser, mobile + */ + public static final byte SIGNAL_SERVER = 0; + public static final byte SIGNAL_NATIVE_CLIENT = 1; + public static final byte SIGNAL_OUTSIDE_CLIENT = 2; /** * EN:Unique identification of a packet, unique representation of an attachment, hashcode() and equals() equals signalId value @@ -45,12 +51,12 @@ public class SignalAttachment { private int taskExecutorHash = -1; /** - * true for the client, false for the server + * 0 for the server, 1 for the sync or async native client, 2 for the outside client such as browser, mobile */ - private boolean client = true; + private byte client = SIGNAL_NATIVE_CLIENT; /** - * The time the client sent it + * The timestamp the client sent it */ private long timestamp = TimeUtils.now(); @@ -64,18 +70,6 @@ public class SignalAttachment { public SignalAttachment() { } - public AttachmentType packetType() { - return AttachmentType.SIGNAL_PACKET; - } - - /** - * EN:Used to determine which thread the message is processed on - * CN:用来确定这条消息在哪一个线程处理 - */ - public int taskExecutorHash() { - return taskExecutorHash; - } - public long getTimestamp() { return timestamp; } @@ -118,15 +112,14 @@ public class SignalAttachment { this.taskExecutorHash = taskExecutorHash; } - public boolean isClient() { + public byte getClient() { return client; } - public void setClient(boolean client) { + public void setClient(byte client) { this.client = client; } - public CompletableFuture getResponseFuture() { return responseFuture; } diff --git a/net/src/main/java/com/zfoo/net/task/TaskBus.java b/net/src/main/java/com/zfoo/net/task/TaskBus.java index 3a79e90c..7cad0317 100644 --- a/net/src/main/java/com/zfoo/net/task/TaskBus.java +++ b/net/src/main/java/com/zfoo/net/task/TaskBus.java @@ -18,7 +18,6 @@ import com.zfoo.net.NetContext; import com.zfoo.net.router.attachment.GatewayAttachment; import com.zfoo.net.router.attachment.HttpAttachment; import com.zfoo.net.router.attachment.SignalAttachment; -import com.zfoo.net.session.Session; import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject; import com.zfoo.protocol.util.AssertionUtils; import com.zfoo.protocol.util.RandomUtils; @@ -115,16 +114,8 @@ public final class TaskBus { * GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定 * SignalAttachment:taskExecutorHash通过IRouter和IConsumer的argument参数指定 */ - public static void dispatch(PacketReceiverTask task) { - var attachment = task.getAttachment(); - if (attachment == null) { - dispatchBySession(task.getSession(), task); - } else { - dispatchByAttachment(attachment, task); - } - } - - private static void dispatchBySession(Session session, PacketReceiverTask task) { + public static void dispatchBySession(PacketReceiverTask task) { + var session = task.getSession(); var uid = session.getUid(); if (uid > 0) { execute((int) uid, task); @@ -133,17 +124,8 @@ public final class TaskBus { } } - private static void dispatchByAttachment(Object attachment, PacketReceiverTask task) { - var attachmentClass = attachment.getClass(); - if (attachmentClass == SignalAttachment.class) { - execute(((SignalAttachment) attachment).taskExecutorHash(), task); - } else if (attachmentClass == GatewayAttachment.class) { - execute(((GatewayAttachment) attachment).taskExecutorHash(), task); - } else if (attachmentClass == HttpAttachment.class) { - execute(((HttpAttachment) attachment).taskExecutorHash(), task); - } else { - dispatchBySession(task.getSession(), task); - } + public static void dispatchByTaskExecutorHash(int taskExecutorHash, PacketReceiverTask task) { + execute(taskExecutorHash, task); } public static int calTaskExecutorHash(int taskExecutorHash) {