ref[net]: refactor attachment

This commit is contained in:
godotg
2023-09-10 14:04:42 +08:00
parent 68dc4f3735
commit 2a552eebea
6 changed files with 50 additions and 88 deletions
@@ -88,7 +88,7 @@ public class GatewayRouteHandler extends ServerRouteHandler {
// 例子:以聊天服务来说,玩家知道自己在哪个群组groupId中,那往这个群发送消息时,会在Packet中带上这个groupId做为一致性hash就可以了。
if (packet instanceof IGatewayLoadBalancer) {
var loadBalancerConsistentHashObject = ((IGatewayLoadBalancer) packet).loadBalancerConsistentHashObject();
gatewayAttachment.wrapTaskExecutorHash(loadBalancerConsistentHashObject);
gatewayAttachment.setTaskExecutorHash(loadBalancerConsistentHashObject.hashCode());
forwardingPacket(packet, gatewayAttachment, loadBalancerConsistentHashObject);
return;
} else {
@@ -25,6 +25,7 @@ import com.zfoo.net.packet.common.Heartbeat;
import com.zfoo.net.router.answer.AsyncAnswer;
import com.zfoo.net.router.answer.SyncAnswer;
import com.zfoo.net.router.attachment.GatewayAttachment;
import com.zfoo.net.router.attachment.HttpAttachment;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.router.exception.ErrorResponseException;
import com.zfoo.net.router.exception.NetTimeOutException;
@@ -74,10 +75,11 @@ public class Router implements IRouter {
return;
}
var task = new PacketReceiverTask(session, packet, attachment);
if (attachment == null) {
// 正常发送消息的接收,把客户端的业务请求包装下到路由策略指定的线程进行业务处理
// 注意:像客户端以asyncAsk发送请求,在服务器处理完后返回结果,在请求方也是进入这个receive方法,但是attachment不为空,会提前return掉不会走到这
TaskBus.dispatch(new PacketReceiverTask(session, packet, null));
TaskBus.dispatchBySession(task);
return;
}
@@ -85,12 +87,15 @@ public class Router implements IRouter {
if (attachment.getClass() == SignalAttachment.class) {
var signalAttachment = (SignalAttachment) attachment;
if (signalAttachment.isClient()) {
if (signalAttachment.getClient() == SignalAttachment.SIGNAL_OUTSIDE_CLIENT) {
// 服务器收到外部客户端的SIGNAL_OUTSIDE_CLIENT,不做任何处理
TaskBus.dispatchBySession(task);
} else if (signalAttachment.getClient() == SignalAttachment.SIGNAL_NATIVE_CLIENT) {
// 服务器收到signalAttachment,不做任何处理
signalAttachment.setClient(false);
TaskBus.dispatch(new PacketReceiverTask(session, packet, attachment));
signalAttachment.setClient(SignalAttachment.SIGNAL_SERVER);
TaskBus.dispatchByTaskExecutorHash(signalAttachment.getTaskExecutorHash(), task);
} else {
// 客户端收到服务器应答,客户端发送的时候isClient为true,服务器收到的时候将其设置为false
// 客户端收到服务器应答,客户端发送的时候client为SIGNAL_NATIVE_CLIENT,服务器收到的时候将其设置为SIGNAL_SERVER
var removedAttachment = (SignalAttachment) SignalBridge.removeSignalAttachment(signalAttachment);
if (removedAttachment == null) {
logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment));
@@ -109,7 +114,7 @@ public class Router implements IRouter {
// 注意:此时并没有return,这样子网关的消息才能发给home,在home进行处理LogoutRequest消息的处理
if (gatewayAttachment.isClient()) {
gatewayAttachment.setClient(false);
TaskBus.dispatch(new PacketReceiverTask(session, packet, attachment));
TaskBus.dispatchByTaskExecutorHash(gatewayAttachment.getTaskExecutorHash(), task);
} else {
// 这里是:别的服务提供者提供授权给网关,比如:在玩家登录后,home服查到了玩家uid,然后发给Gateway服
var gatewaySession = NetContext.getSessionManager().getServerSession(gatewayAttachment.getSid());
@@ -137,7 +142,13 @@ public class Router implements IRouter {
return;
}
TaskBus.dispatch(new PacketReceiverTask(session, packet, attachment));
if (attachment.getClass() == HttpAttachment.class) {
var httpAttachment = (HttpAttachment) attachment;
TaskBus.dispatchByTaskExecutorHash(httpAttachment.getTaskExecutorHash(), task);
return;
}
TaskBus.dispatchBySession(task);
}
@Override
@@ -35,8 +35,11 @@ public class GatewayAttachment {
*/
private long uid;
private boolean useTaskExecutorHashParam;
private int taskExecutorHashParam;
/**
* EN:Used to determine which thread the message is processed on
* CN:用来确定这条消息在哪一个线程处理
*/
private int taskExecutorHash;
/**
* true for the client, false for the server
@@ -71,13 +74,9 @@ public class GatewayAttachment {
* CN:用来确定这条消息在哪一个线程处理
*/
public int taskExecutorHash() {
return useTaskExecutorHashParam ? taskExecutorHashParam : (int) uid;
return taskExecutorHash == 0 ? (int) uid : taskExecutorHash;
}
public void wrapTaskExecutorHash(Object argument) {
this.useTaskExecutorHashParam = true;
this.taskExecutorHashParam = argument.hashCode();
}
public long getSid() {
return sid;
@@ -95,20 +94,12 @@ public class GatewayAttachment {
this.uid = uid;
}
public boolean isUseTaskExecutorHashParam() {
return useTaskExecutorHashParam;
public int getTaskExecutorHash() {
return taskExecutorHash;
}
public void setUseTaskExecutorHashParam(boolean useTaskExecutorHashParam) {
this.useTaskExecutorHashParam = useTaskExecutorHashParam;
}
public int getTaskExecutorHashParam() {
return taskExecutorHashParam;
}
public void setTaskExecutorHashParam(int taskExecutorHashParam) {
this.taskExecutorHashParam = taskExecutorHashParam;
public void setTaskExecutorHash(int taskExecutorHash) {
this.taskExecutorHash = taskExecutorHash;
}
public boolean isClient() {
@@ -25,9 +25,7 @@ public class HttpAttachment {
private long uid;
private boolean useTaskExecutorHashParam;
private int taskExecutorHashParam;
private int taskExecutorHash;
private transient FullHttpRequest fullHttpRequest;
@@ -46,12 +44,7 @@ public class HttpAttachment {
* CN:用来确定这条消息在哪一个线程处理
*/
public int taskExecutorHash() {
return useTaskExecutorHashParam ? taskExecutorHashParam : (int) uid;
}
public void wrapTaskExecutorHash(Object argument) {
this.useTaskExecutorHashParam = true;
this.taskExecutorHashParam = argument.hashCode();
return taskExecutorHash == 0 ? (int) uid : taskExecutorHash;
}
public long getUid() {
@@ -62,20 +55,12 @@ public class HttpAttachment {
this.uid = uid;
}
public boolean isUseTaskExecutorHashParam() {
return useTaskExecutorHashParam;
public int getTaskExecutorHash() {
return taskExecutorHash;
}
public void setUseTaskExecutorHashParam(boolean useTaskExecutorHashParam) {
this.useTaskExecutorHashParam = useTaskExecutorHashParam;
}
public int getTaskExecutorHashParam() {
return taskExecutorHashParam;
}
public void setTaskExecutorHashParam(int taskExecutorHashParam) {
this.taskExecutorHashParam = taskExecutorHashParam;
public void setTaskExecutorHash(int taskExecutorHash) {
this.taskExecutorHash = taskExecutorHash;
}
public FullHttpRequest getFullHttpRequest() {
@@ -31,6 +31,12 @@ public class SignalAttachment {
* CN:允许负数的signalId
*/
public static final AtomicInteger ATOMIC_ID = new AtomicInteger(0);
/**
* 0 for the server, 1 for the sync or async native client, 2 for the outside client such as browser, mobile
*/
public static final byte SIGNAL_SERVER = 0;
public static final byte SIGNAL_NATIVE_CLIENT = 1;
public static final byte SIGNAL_OUTSIDE_CLIENT = 2;
/**
* EN:Unique identification of a packet, unique representation of an attachment, hashcode() and equals() equals signalId value
@@ -45,12 +51,12 @@ public class SignalAttachment {
private int taskExecutorHash = -1;
/**
* true for the client, false for the server
* 0 for the server, 1 for the sync or async native client, 2 for the outside client such as browser, mobile
*/
private boolean client = true;
private byte client = SIGNAL_NATIVE_CLIENT;
/**
* The time the client sent it
* The timestamp the client sent it
*/
private long timestamp = TimeUtils.now();
@@ -64,18 +70,6 @@ public class SignalAttachment {
public SignalAttachment() {
}
public AttachmentType packetType() {
return AttachmentType.SIGNAL_PACKET;
}
/**
* EN:Used to determine which thread the message is processed on
* CN:用来确定这条消息在哪一个线程处理
*/
public int taskExecutorHash() {
return taskExecutorHash;
}
public long getTimestamp() {
return timestamp;
}
@@ -118,15 +112,14 @@ public class SignalAttachment {
this.taskExecutorHash = taskExecutorHash;
}
public boolean isClient() {
public byte getClient() {
return client;
}
public void setClient(boolean client) {
public void setClient(byte client) {
this.client = client;
}
public CompletableFuture<Object> getResponseFuture() {
return responseFuture;
}
@@ -18,7 +18,6 @@ import com.zfoo.net.NetContext;
import com.zfoo.net.router.attachment.GatewayAttachment;
import com.zfoo.net.router.attachment.HttpAttachment;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.session.Session;
import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject;
import com.zfoo.protocol.util.AssertionUtils;
import com.zfoo.protocol.util.RandomUtils;
@@ -115,16 +114,8 @@ public final class TaskBus {
* GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定
* SignalAttachmenttaskExecutorHash通过IRouter和IConsumer的argument参数指定
*/
public static void dispatch(PacketReceiverTask task) {
var attachment = task.getAttachment();
if (attachment == null) {
dispatchBySession(task.getSession(), task);
} else {
dispatchByAttachment(attachment, task);
}
}
private static void dispatchBySession(Session session, PacketReceiverTask task) {
public static void dispatchBySession(PacketReceiverTask task) {
var session = task.getSession();
var uid = session.getUid();
if (uid > 0) {
execute((int) uid, task);
@@ -133,17 +124,8 @@ public final class TaskBus {
}
}
private static void dispatchByAttachment(Object attachment, PacketReceiverTask task) {
var attachmentClass = attachment.getClass();
if (attachmentClass == SignalAttachment.class) {
execute(((SignalAttachment) attachment).taskExecutorHash(), task);
} else if (attachmentClass == GatewayAttachment.class) {
execute(((GatewayAttachment) attachment).taskExecutorHash(), task);
} else if (attachmentClass == HttpAttachment.class) {
execute(((HttpAttachment) attachment).taskExecutorHash(), task);
} else {
dispatchBySession(task.getSession(), task);
}
public static void dispatchByTaskExecutorHash(int taskExecutorHash, PacketReceiverTask task) {
execute(taskExecutorHash, task);
}
public static int calTaskExecutorHash(int taskExecutorHash) {